Spaces:
Sleeping
Sleeping
File size: 7,383 Bytes
7be094d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# モデル学習スクリプト
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import LinearSVC
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import joblib
import time
# データとモデルのパス
DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data")
MODELS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "models")
os.makedirs(MODELS_DIR, exist_ok=True)
# モデルのパス
MODEL_PATH = os.path.join(MODELS_DIR, "email_classifier.pkl")
def train_and_evaluate_model(data_path):
"""
メール分類モデルを学習して評価
Args:
data_path: トレーニングデータのCSVファイルパス
Returns:
best_model: 学習済みの最良モデル
"""
print(f"データを {data_path} から読み込み中...")
try:
df = pd.read_csv(data_path)
except Exception as e:
print(f"データ読み込みエラー: {e}")
return None
print(f"データ読み込み完了。件数: {len(df)}件")
print(f" - 正当な問い合わせ: {sum(df['label'] == 0)}件")
print(f" - 営業・スパム: {sum(df['label'] == 1)}件")
# データの準備
X = df['email_text'].values
y = df['label'].values
# トレーニングデータとテストデータに分割
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print("モデルの学習と評価を実施中...")
# 候補モデルの定義
models = [
{
'name': 'RandomForest',
'pipeline': Pipeline([
('vectorizer', TfidfVectorizer(max_features=5000, ngram_range=(1, 2))),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
},
{
'name': 'LogisticRegression',
'pipeline': Pipeline([
('vectorizer', TfidfVectorizer(max_features=5000, ngram_range=(1, 2))),
('classifier', LogisticRegression(random_state=42, max_iter=1000))
])
},
{
'name': 'NaiveBayes',
'pipeline': Pipeline([
('vectorizer', TfidfVectorizer(max_features=5000, ngram_range=(1, 2))),
('classifier', MultinomialNB())
])
},
{
'name': 'LinearSVC',
'pipeline': Pipeline([
('vectorizer', TfidfVectorizer(max_features=5000, ngram_range=(1, 2))),
('classifier', LinearSVC(random_state=42, max_iter=10000))
])
}
]
# 各モデルを評価
results = []
for model in models:
name = model['name']
pipeline = model['pipeline']
print(f" {name} を学習中...")
start_time = time.time()
# モデルのフィット
pipeline.fit(X_train, y_train)
# 予測
y_pred = pipeline.predict(X_test)
# 評価メトリクス
accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred, output_dict=True)
train_time = time.time() - start_time
# 結果保存
results.append({
'name': name,
'pipeline': pipeline,
'accuracy': accuracy,
'precision': report['weighted avg']['precision'],
'recall': report['weighted avg']['recall'],
'f1': report['weighted avg']['f1-score'],
'train_time': train_time
})
print(f" {name} - 精度: {accuracy:.4f}, F1: {report['weighted avg']['f1-score']:.4f}, 学習時間: {train_time:.2f}秒")
# 最良モデルの選定(F1スコアで比較)
results.sort(key=lambda x: x['f1'], reverse=True)
best_model = results[0]
print(f"\n最良モデル: {best_model['name']}")
print(f" - 精度: {best_model['accuracy']:.4f}")
print(f" - 適合率: {best_model['precision']:.4f}")
print(f" - 再現率: {best_model['recall']:.4f}")
print(f" - F1スコア: {best_model['f1']:.4f}")
# 学習結果の詳細を表示
print("\n詳細な分類レポート:")
y_pred = best_model['pipeline'].predict(X_test)
print(classification_report(y_test, y_pred))
# 混同行列の表示
cm = confusion_matrix(y_test, y_pred)
print("\n混同行列:")
print(cm)
# モデルの保存
print(f"\nモデルを {MODEL_PATH} に保存中...")
joblib.dump(best_model['pipeline'], MODEL_PATH)
print("モデル保存完了")
return best_model['pipeline']
def plot_importance(model_path):
"""
特徴量重要度のプロット
Args:
model_path: 学習済みモデルのパス
"""
# モデルの読み込み
pipeline = joblib.load(model_path)
# モデルタイプ別の特徴量重要度の取得
feature_names = pipeline.named_steps['vectorizer'].get_feature_names_out()
classifier = pipeline.named_steps['classifier']
# 重要度の抽出
importance_scores = None
if hasattr(classifier, 'feature_importances_'):
# RandomForestの場合
importance_scores = classifier.feature_importances_
elif hasattr(classifier, 'coef_'):
# 線形モデルの場合
if len(classifier.coef_.shape) > 1:
# 多クラス分類の場合、絶対値の平均を取る
importance_scores = np.abs(classifier.coef_).mean(axis=0)
else:
# 二項分類の場合
importance_scores = np.abs(classifier.coef_[0])
if importance_scores is not None:
# 上位20個の特徴量を表示
indices = np.argsort(importance_scores)[-20:]
plt.figure(figsize=(10, 8))
plt.title('Top 20 Feature Importances')
plt.barh(range(20), importance_scores[indices])
plt.yticks(range(20), [feature_names[i] for i in indices])
plt.tight_layout()
# 保存
plot_path = os.path.join(MODELS_DIR, "feature_importance.png")
plt.savefig(plot_path)
print(f"特徴量重要度のプロットを {plot_path} に保存しました。")
if __name__ == "__main__":
# サンプルデータファイルパス
data_path = os.path.join(DATA_DIR, "sample_emails.csv")
# データファイルが存在するか確認
if not os.path.exists(data_path):
print(f"データファイル {data_path} が見つかりません。")
print("data_generator.pyを先に実行してください。")
else:
# モデルのトレーニングと評価
model = train_and_evaluate_model(data_path)
# モデルが正常に学習された場合、特徴量重要度をプロット
if model is not None and os.path.exists(MODEL_PATH):
plot_importance(MODEL_PATH) |