early_warning_model / src /predictor.py
LLouis0622's picture
Upload folder using huggingface_hub
5092c1e verified
import pickle
import json
import numpy as np
import pandas as pd
from pathlib import Path
from typing import Dict, List, Optional, Union
import warnings
warnings.filterwarnings('ignore')
class EarlyWarningPredictor:
"""자영업 조기경보 예측 모델"""
def __init__(self, model_path: Optional[str] = None):
self.model_path = Path(model_path) if model_path else Path(__file__).parent.parent / 'model'
self.xgb_model = None
self.lgb_model = None
self.catboost_model = None
self.label_encoders = {}
self.feature_names = []
self.config = {}
self.is_loaded = False
@classmethod
def from_pretrained(cls, model_name_or_path: str):
predictor = cls(model_path=model_name_or_path)
predictor.load_model()
return predictor
def load_model(self):
"""모델 및 설정 로드"""
if not self.model_path.exists():
raise FileNotFoundError(f"Model directory not found: {self.model_path}")
# XGBoost 로드
xgb_path = self.model_path / 'xgboost_model.pkl'
if xgb_path.exists():
with open(xgb_path, 'rb') as f:
self.xgb_model = pickle.load(f)
# LightGBM 로드
lgb_path = self.model_path / 'lightgbm_model.pkl'
if lgb_path.exists():
with open(lgb_path, 'rb') as f:
self.lgb_model = pickle.load(f)
# CatBoost 로드
catboost_path = self.model_path / 'catboost_model.pkl'
if catboost_path.exists():
with open(catboost_path, 'rb') as f:
self.catboost_model = pickle.load(f)
# Label Encoders 로드
le_path = self.model_path / 'label_encoders.pkl'
if le_path.exists():
with open(le_path, 'rb') as f:
self.label_encoders = pickle.load(f)
# Feature names 로드
fn_path = self.model_path / 'feature_names.json'
if fn_path.exists():
with open(fn_path, 'r', encoding='utf-8') as f:
self.feature_names = json.load(f)
# Config 로드
config_path = self.model_path / 'config.json'
if config_path.exists():
with open(config_path, 'r', encoding='utf-8') as f:
self.config = json.load(f)
self.is_loaded = True
print(f"모델 로드 완료: v{self.config.get('model_version', '2.0')}")
def predict(self, store_data: Dict,
monthly_usage: Optional[pd.DataFrame] = None,
monthly_customers: Optional[pd.DataFrame] = None,
threshold: Optional[float] = None) -> Dict:
if not self.is_loaded:
self.load_model()
# 특징 생성
from src.feature_engineering import FeatureEngineer
engineer = FeatureEngineer()
if monthly_usage is None or monthly_customers is None:
# 간단한 데이터 형식
features = self._create_simple_features(store_data)
else:
# 전체 특징 생성
features = engineer.create_features(store_data, monthly_usage, monthly_customers)
# 특징 정렬 및 결측치 처리
features = self._align_features(features)
# 예측
threshold = threshold or self.config.get('threshold', 0.5)
if self.xgb_model and self.lgb_model:
# 앙상블 예측
xgb_prob = self.xgb_model.predict_proba(features)[0][1]
lgb_prob = self.lgb_model.predict_proba(features)[0][1]
weights = self.config.get('ensemble_weights', [0.5, 0.5])
closure_probability = weights[0] * xgb_prob + weights[1] * lgb_prob
if self.catboost_model and len(weights) > 2:
cat_prob = self.catboost_model.predict_proba(features)[0][1]
closure_probability = (weights[0] * xgb_prob +
weights[1] * lgb_prob +
weights[2] * cat_prob)
else:
closure_probability = 0.5
# 위험도 점수(0-100)
risk_score = closure_probability * 100
# 위험 등급
if risk_score < 30:
risk_level = '낮음'
risk_color = 'green'
elif risk_score < 60:
risk_level = '보통'
risk_color = 'yellow'
else:
risk_level = '높음'
risk_color = 'red'
# 예측 결과
result = {
'risk_score': round(risk_score, 2),
'risk_level': risk_level,
'risk_color': risk_color,
'closure_probability': round(closure_probability, 4),
'is_at_risk': closure_probability > threshold,
'threshold': threshold,
'confidence': max(closure_probability, 1 - closure_probability),
'model_version': self.config.get('model_version', '2.0')
}
# 위험 요인 분석(특징 중요도 기반)
if self.xgb_model:
result['risk_factors'] = self._analyze_risk_factors(features)
# 액션 아이템
result['action_items'] = self._generate_action_items(result, store_data)
return result
def predict_batch(self, stores_df: pd.DataFrame) -> pd.DataFrame:
results = []
for idx, row in stores_df.iterrows():
store_data = row.to_dict()
result = self.predict(store_data)
result['store_id'] = row.get('store_id', idx)
results.append(result)
return pd.DataFrame(results)
def explain(self, store_data: Dict, top_n: int = 10) -> Dict:
# SHAP 분석(간단한 버전)
result = self.predict(store_data)
explanation = {
'prediction': result,
'top_features': result.get('risk_factors', {}),
'interpretation': self._interpret_prediction(result)
}
return explanation
def _create_simple_features(self, store_data: Dict) -> pd.DataFrame:
"""간단한 특징 생성"""
# 기본 특징만 사용
features = {
'sales_avg_all': store_data.get('avg_sales', 50),
'customer_reuse_rate': store_data.get('reuse_rate', 25),
'operation_months': store_data.get('operating_months', 12),
'trend_slope': store_data.get('sales_trend', 0),
}
# 나머지 특징은 기본값으로
for fname in self.feature_names:
if fname not in features:
features[fname] = 0
return pd.DataFrame([features])
def _align_features(self, features: pd.DataFrame) -> pd.DataFrame:
"""특징 정렬 및 전처리"""
# 모델 학습 시 사용한 특징 순서로 정렬
aligned = pd.DataFrame()
for fname in self.feature_names:
if fname in features.columns:
aligned[fname] = features[fname]
else:
aligned[fname] = 0
# 결측치 처리
aligned = aligned.fillna(aligned.median().fillna(0))
return aligned
def _analyze_risk_factors(self, features: pd.DataFrame) -> Dict[str, float]:
"""위험 요인 분석"""
# 특징 중요도 기반
if not hasattr(self.xgb_model, 'feature_importances_'):
return {}
importance = self.xgb_model.feature_importances_
feature_values = features.iloc[0].values
# 중요도와 값을 곱해서 기여도 계산
contributions = {}
for i, fname in enumerate(self.feature_names):
if importance[i] > 0.01: # 중요한 특징만
score = importance[i] * abs(feature_values[i]) * 10
# 특징명을 한글로 변환
readable_name = self._translate_feature_name(fname)
contributions[readable_name] = min(round(score, 1), 100)
# 상위 6개만 반환
sorted_factors = sorted(contributions.items(), key=lambda x: x[1], reverse=True)[:6]
return dict(sorted_factors)
def _translate_feature_name(self, fname: str) -> str:
"""특징명을 읽기 쉬운 형태로 변환"""
translations = {
'sales_avg': '매출',
'trend_slope': '매출 추세',
'trend_consecutive_down': '연속 하락',
'customer_reuse_rate': '재이용률',
'volatility_cv': '매출 변동성',
'operation_months': '영업 기간',
'sales_recent_vs_previous': '최근 매출 변화'
}
for key, value in translations.items():
if key in fname:
return value
return fname
def _generate_action_items(self, result: Dict, store_data: Dict) -> List[str]:
"""액션 아이템 생성"""
actions = []
risk_score = result['risk_score']
if risk_score > 70:
actions.append("즉시 조치 필요: 비용 절감 및 매출 증대 방안 마련")
actions.append("현금흐름 개선: 외상 매출 회수 및 재고 최적화")
actions.append("전문가 상담: 경영 컨설팅 및 구조조정 검토")
elif risk_score > 40:
actions.append("매출 분석: 주력 상품/서비스 재점검")
actions.append("마케팅 강화: 신규 고객 유치 캠페인")
actions.append("차별화 전략: 경쟁력 있는 요소 발굴 및 강화")
else:
actions.append("현재 상태 유지: 정기적인 모니터링 지속")
actions.append("성장 기회 탐색: 추가 매출원 발굴")
actions.append("고객 충성도 강화: 멤버십 프로그램 등")
return actions
def _interpret_prediction(self, result: Dict) -> str:
"""예측 결과 해석"""
risk_level = result['risk_level']
risk_score = result['risk_score']
if risk_level == '높음':
return f"위험도가 매우 높습니다 ({risk_score:.1f}점). 폐업 위험이 높으므로 즉각적인 대응이 필요합니다."
elif risk_level == '보통':
return f"주의가 필요합니다 ({risk_score:.1f}점). 개선 방안을 마련하여 위험을 줄이세요."
else:
return f"안정적입니다 ({risk_score:.1f}점). 현재의 운영 방식을 유지하면서 지속적으로 모니터링하세요."
def get_model_info(self) -> Dict:
"""모델 정보 반환"""
return {
'version': self.config.get('model_version', '2.0'),
'n_features': self.config.get('n_features', 0),
'performance': self.config.get('performance', {}),
'ensemble_weights': self.config.get('ensemble_weights', []),
'models': {
'xgboost': self.xgb_model is not None,
'lightgbm': self.lgb_model is not None,
'catboost': self.catboost_model is not None
}
}
if __name__ == "__main__":
# 사용 예시
print("=" * 70)
print("Early Warning Predictor v2.0 테스트")
print("=" * 70)
# 모델 로드
predictor = EarlyWarningPredictor(model_path='../model')
try:
predictor.load_model()
# 테스트 데이터
store_data = {
'store_id': 'TEST_001',
'industry': '카페',
'location': '서울 강남구',
'avg_sales': 45,
'reuse_rate': 22.5,
'operating_months': 18,
'sales_trend': -0.05
}
# 예측
result = predictor.predict(store_data)
print("\n예측 결과:")
print(f" 위험도 점수: {result['risk_score']}/100")
print(f" 위험 등급: {result['risk_level']}")
print(f" 폐업 확률: {result['closure_probability']:.1%}")
if 'risk_factors' in result:
print("\n주요 위험 요인:")
for factor, score in result['risk_factors'].items():
print(f" - {factor}: {score:.1f}점")
print("\n액션 아이템:")
for action in result['action_items']:
print(f" {action}")
except FileNotFoundError:
print("모델 파일이 없습니다. 먼저 모델을 학습해주세요.")