|
|
import pickle |
|
|
import json |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Optional, Union |
|
|
import warnings |
|
|
|
|
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
|
|
|
class EarlyWarningPredictor: |
|
|
"""자영업 조기경보 예측 모델""" |
|
|
|
|
|
def __init__(self, model_path: Optional[str] = None): |
|
|
self.model_path = Path(model_path) if model_path else Path(__file__).parent.parent / 'model' |
|
|
self.xgb_model = None |
|
|
self.lgb_model = None |
|
|
self.catboost_model = None |
|
|
self.label_encoders = {} |
|
|
self.feature_names = [] |
|
|
self.config = {} |
|
|
self.is_loaded = False |
|
|
|
|
|
@classmethod |
|
|
def from_pretrained(cls, model_name_or_path: str): |
|
|
predictor = cls(model_path=model_name_or_path) |
|
|
predictor.load_model() |
|
|
return predictor |
|
|
|
|
|
def load_model(self): |
|
|
"""모델 및 설정 로드""" |
|
|
if not self.model_path.exists(): |
|
|
raise FileNotFoundError(f"Model directory not found: {self.model_path}") |
|
|
|
|
|
|
|
|
xgb_path = self.model_path / 'xgboost_model.pkl' |
|
|
if xgb_path.exists(): |
|
|
with open(xgb_path, 'rb') as f: |
|
|
self.xgb_model = pickle.load(f) |
|
|
|
|
|
|
|
|
lgb_path = self.model_path / 'lightgbm_model.pkl' |
|
|
if lgb_path.exists(): |
|
|
with open(lgb_path, 'rb') as f: |
|
|
self.lgb_model = pickle.load(f) |
|
|
|
|
|
|
|
|
catboost_path = self.model_path / 'catboost_model.pkl' |
|
|
if catboost_path.exists(): |
|
|
with open(catboost_path, 'rb') as f: |
|
|
self.catboost_model = pickle.load(f) |
|
|
|
|
|
|
|
|
le_path = self.model_path / 'label_encoders.pkl' |
|
|
if le_path.exists(): |
|
|
with open(le_path, 'rb') as f: |
|
|
self.label_encoders = pickle.load(f) |
|
|
|
|
|
|
|
|
fn_path = self.model_path / 'feature_names.json' |
|
|
if fn_path.exists(): |
|
|
with open(fn_path, 'r', encoding='utf-8') as f: |
|
|
self.feature_names = json.load(f) |
|
|
|
|
|
|
|
|
config_path = self.model_path / 'config.json' |
|
|
if config_path.exists(): |
|
|
with open(config_path, 'r', encoding='utf-8') as f: |
|
|
self.config = json.load(f) |
|
|
|
|
|
self.is_loaded = True |
|
|
print(f"모델 로드 완료: v{self.config.get('model_version', '2.0')}") |
|
|
|
|
|
def predict(self, store_data: Dict, |
|
|
monthly_usage: Optional[pd.DataFrame] = None, |
|
|
monthly_customers: Optional[pd.DataFrame] = None, |
|
|
threshold: Optional[float] = None) -> Dict: |
|
|
if not self.is_loaded: |
|
|
self.load_model() |
|
|
|
|
|
|
|
|
from src.feature_engineering import FeatureEngineer |
|
|
engineer = FeatureEngineer() |
|
|
|
|
|
if monthly_usage is None or monthly_customers is None: |
|
|
|
|
|
features = self._create_simple_features(store_data) |
|
|
else: |
|
|
|
|
|
features = engineer.create_features(store_data, monthly_usage, monthly_customers) |
|
|
|
|
|
|
|
|
features = self._align_features(features) |
|
|
|
|
|
|
|
|
threshold = threshold or self.config.get('threshold', 0.5) |
|
|
|
|
|
if self.xgb_model and self.lgb_model: |
|
|
|
|
|
xgb_prob = self.xgb_model.predict_proba(features)[0][1] |
|
|
lgb_prob = self.lgb_model.predict_proba(features)[0][1] |
|
|
|
|
|
weights = self.config.get('ensemble_weights', [0.5, 0.5]) |
|
|
closure_probability = weights[0] * xgb_prob + weights[1] * lgb_prob |
|
|
|
|
|
if self.catboost_model and len(weights) > 2: |
|
|
cat_prob = self.catboost_model.predict_proba(features)[0][1] |
|
|
closure_probability = (weights[0] * xgb_prob + |
|
|
weights[1] * lgb_prob + |
|
|
weights[2] * cat_prob) |
|
|
else: |
|
|
closure_probability = 0.5 |
|
|
|
|
|
|
|
|
risk_score = closure_probability * 100 |
|
|
|
|
|
|
|
|
if risk_score < 30: |
|
|
risk_level = '낮음' |
|
|
risk_color = 'green' |
|
|
elif risk_score < 60: |
|
|
risk_level = '보통' |
|
|
risk_color = 'yellow' |
|
|
else: |
|
|
risk_level = '높음' |
|
|
risk_color = 'red' |
|
|
|
|
|
|
|
|
result = { |
|
|
'risk_score': round(risk_score, 2), |
|
|
'risk_level': risk_level, |
|
|
'risk_color': risk_color, |
|
|
'closure_probability': round(closure_probability, 4), |
|
|
'is_at_risk': closure_probability > threshold, |
|
|
'threshold': threshold, |
|
|
'confidence': max(closure_probability, 1 - closure_probability), |
|
|
'model_version': self.config.get('model_version', '2.0') |
|
|
} |
|
|
|
|
|
|
|
|
if self.xgb_model: |
|
|
result['risk_factors'] = self._analyze_risk_factors(features) |
|
|
|
|
|
|
|
|
result['action_items'] = self._generate_action_items(result, store_data) |
|
|
|
|
|
return result |
|
|
|
|
|
def predict_batch(self, stores_df: pd.DataFrame) -> pd.DataFrame: |
|
|
results = [] |
|
|
|
|
|
for idx, row in stores_df.iterrows(): |
|
|
store_data = row.to_dict() |
|
|
result = self.predict(store_data) |
|
|
result['store_id'] = row.get('store_id', idx) |
|
|
results.append(result) |
|
|
|
|
|
return pd.DataFrame(results) |
|
|
|
|
|
def explain(self, store_data: Dict, top_n: int = 10) -> Dict: |
|
|
|
|
|
result = self.predict(store_data) |
|
|
|
|
|
explanation = { |
|
|
'prediction': result, |
|
|
'top_features': result.get('risk_factors', {}), |
|
|
'interpretation': self._interpret_prediction(result) |
|
|
} |
|
|
|
|
|
return explanation |
|
|
|
|
|
def _create_simple_features(self, store_data: Dict) -> pd.DataFrame: |
|
|
"""간단한 특징 생성""" |
|
|
|
|
|
features = { |
|
|
'sales_avg_all': store_data.get('avg_sales', 50), |
|
|
'customer_reuse_rate': store_data.get('reuse_rate', 25), |
|
|
'operation_months': store_data.get('operating_months', 12), |
|
|
'trend_slope': store_data.get('sales_trend', 0), |
|
|
} |
|
|
|
|
|
|
|
|
for fname in self.feature_names: |
|
|
if fname not in features: |
|
|
features[fname] = 0 |
|
|
|
|
|
return pd.DataFrame([features]) |
|
|
|
|
|
def _align_features(self, features: pd.DataFrame) -> pd.DataFrame: |
|
|
"""특징 정렬 및 전처리""" |
|
|
|
|
|
aligned = pd.DataFrame() |
|
|
|
|
|
for fname in self.feature_names: |
|
|
if fname in features.columns: |
|
|
aligned[fname] = features[fname] |
|
|
else: |
|
|
aligned[fname] = 0 |
|
|
|
|
|
|
|
|
aligned = aligned.fillna(aligned.median().fillna(0)) |
|
|
|
|
|
return aligned |
|
|
|
|
|
def _analyze_risk_factors(self, features: pd.DataFrame) -> Dict[str, float]: |
|
|
"""위험 요인 분석""" |
|
|
|
|
|
if not hasattr(self.xgb_model, 'feature_importances_'): |
|
|
return {} |
|
|
|
|
|
importance = self.xgb_model.feature_importances_ |
|
|
feature_values = features.iloc[0].values |
|
|
|
|
|
|
|
|
contributions = {} |
|
|
|
|
|
for i, fname in enumerate(self.feature_names): |
|
|
if importance[i] > 0.01: |
|
|
score = importance[i] * abs(feature_values[i]) * 10 |
|
|
|
|
|
|
|
|
readable_name = self._translate_feature_name(fname) |
|
|
contributions[readable_name] = min(round(score, 1), 100) |
|
|
|
|
|
|
|
|
sorted_factors = sorted(contributions.items(), key=lambda x: x[1], reverse=True)[:6] |
|
|
|
|
|
return dict(sorted_factors) |
|
|
|
|
|
def _translate_feature_name(self, fname: str) -> str: |
|
|
"""특징명을 읽기 쉬운 형태로 변환""" |
|
|
translations = { |
|
|
'sales_avg': '매출', |
|
|
'trend_slope': '매출 추세', |
|
|
'trend_consecutive_down': '연속 하락', |
|
|
'customer_reuse_rate': '재이용률', |
|
|
'volatility_cv': '매출 변동성', |
|
|
'operation_months': '영업 기간', |
|
|
'sales_recent_vs_previous': '최근 매출 변화' |
|
|
} |
|
|
|
|
|
for key, value in translations.items(): |
|
|
if key in fname: |
|
|
return value |
|
|
|
|
|
return fname |
|
|
|
|
|
def _generate_action_items(self, result: Dict, store_data: Dict) -> List[str]: |
|
|
"""액션 아이템 생성""" |
|
|
actions = [] |
|
|
|
|
|
risk_score = result['risk_score'] |
|
|
|
|
|
if risk_score > 70: |
|
|
actions.append("즉시 조치 필요: 비용 절감 및 매출 증대 방안 마련") |
|
|
actions.append("현금흐름 개선: 외상 매출 회수 및 재고 최적화") |
|
|
actions.append("전문가 상담: 경영 컨설팅 및 구조조정 검토") |
|
|
elif risk_score > 40: |
|
|
actions.append("매출 분석: 주력 상품/서비스 재점검") |
|
|
actions.append("마케팅 강화: 신규 고객 유치 캠페인") |
|
|
actions.append("차별화 전략: 경쟁력 있는 요소 발굴 및 강화") |
|
|
else: |
|
|
actions.append("현재 상태 유지: 정기적인 모니터링 지속") |
|
|
actions.append("성장 기회 탐색: 추가 매출원 발굴") |
|
|
actions.append("고객 충성도 강화: 멤버십 프로그램 등") |
|
|
|
|
|
return actions |
|
|
|
|
|
def _interpret_prediction(self, result: Dict) -> str: |
|
|
"""예측 결과 해석""" |
|
|
risk_level = result['risk_level'] |
|
|
risk_score = result['risk_score'] |
|
|
|
|
|
if risk_level == '높음': |
|
|
return f"위험도가 매우 높습니다 ({risk_score:.1f}점). 폐업 위험이 높으므로 즉각적인 대응이 필요합니다." |
|
|
elif risk_level == '보통': |
|
|
return f"주의가 필요합니다 ({risk_score:.1f}점). 개선 방안을 마련하여 위험을 줄이세요." |
|
|
else: |
|
|
return f"안정적입니다 ({risk_score:.1f}점). 현재의 운영 방식을 유지하면서 지속적으로 모니터링하세요." |
|
|
|
|
|
def get_model_info(self) -> Dict: |
|
|
"""모델 정보 반환""" |
|
|
return { |
|
|
'version': self.config.get('model_version', '2.0'), |
|
|
'n_features': self.config.get('n_features', 0), |
|
|
'performance': self.config.get('performance', {}), |
|
|
'ensemble_weights': self.config.get('ensemble_weights', []), |
|
|
'models': { |
|
|
'xgboost': self.xgb_model is not None, |
|
|
'lightgbm': self.lgb_model is not None, |
|
|
'catboost': self.catboost_model is not None |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
print("=" * 70) |
|
|
print("Early Warning Predictor v2.0 테스트") |
|
|
print("=" * 70) |
|
|
|
|
|
|
|
|
predictor = EarlyWarningPredictor(model_path='../model') |
|
|
|
|
|
try: |
|
|
predictor.load_model() |
|
|
|
|
|
|
|
|
store_data = { |
|
|
'store_id': 'TEST_001', |
|
|
'industry': '카페', |
|
|
'location': '서울 강남구', |
|
|
'avg_sales': 45, |
|
|
'reuse_rate': 22.5, |
|
|
'operating_months': 18, |
|
|
'sales_trend': -0.05 |
|
|
} |
|
|
|
|
|
|
|
|
result = predictor.predict(store_data) |
|
|
|
|
|
print("\n예측 결과:") |
|
|
print(f" 위험도 점수: {result['risk_score']}/100") |
|
|
print(f" 위험 등급: {result['risk_level']}") |
|
|
print(f" 폐업 확률: {result['closure_probability']:.1%}") |
|
|
|
|
|
if 'risk_factors' in result: |
|
|
print("\n주요 위험 요인:") |
|
|
for factor, score in result['risk_factors'].items(): |
|
|
print(f" - {factor}: {score:.1f}점") |
|
|
|
|
|
print("\n액션 아이템:") |
|
|
for action in result['action_items']: |
|
|
print(f" {action}") |
|
|
|
|
|
except FileNotFoundError: |
|
|
print("모델 파일이 없습니다. 먼저 모델을 학습해주세요.") |
|
|
|