# モデル学習スクリプト import os import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.ensemble import RandomForestClassifier from sklearn.linear_model import LogisticRegression from sklearn.naive_bayes import MultinomialNB from sklearn.svm import LinearSVC from sklearn.pipeline import Pipeline from sklearn.metrics import classification_report, confusion_matrix, accuracy_score import joblib import time # データとモデルのパス DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data") MODELS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "models") os.makedirs(MODELS_DIR, exist_ok=True) # モデルのパス MODEL_PATH = os.path.join(MODELS_DIR, "email_classifier.pkl") def train_and_evaluate_model(data_path): """ メール分類モデルを学習して評価 Args: data_path: トレーニングデータのCSVファイルパス Returns: best_model: 学習済みの最良モデル """ print(f"データを {data_path} から読み込み中...") try: df = pd.read_csv(data_path) except Exception as e: print(f"データ読み込みエラー: {e}") return None print(f"データ読み込み完了。件数: {len(df)}件") print(f" - 正当な問い合わせ: {sum(df['label'] == 0)}件") print(f" - 営業・スパム: {sum(df['label'] == 1)}件") # データの準備 X = df['email_text'].values y = df['label'].values # トレーニングデータとテストデータに分割 X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) print("モデルの学習と評価を実施中...") # 候補モデルの定義 models = [ { 'name': 'RandomForest', 'pipeline': Pipeline([ ('vectorizer', TfidfVectorizer(max_features=5000, ngram_range=(1, 2))), ('classifier', RandomForestClassifier(n_estimators=100, random_state=42)) ]) }, { 'name': 'LogisticRegression', 'pipeline': Pipeline([ ('vectorizer', TfidfVectorizer(max_features=5000, ngram_range=(1, 2))), ('classifier', LogisticRegression(random_state=42, max_iter=1000)) ]) }, { 'name': 'NaiveBayes', 'pipeline': Pipeline([ ('vectorizer', TfidfVectorizer(max_features=5000, ngram_range=(1, 2))), ('classifier', MultinomialNB()) ]) }, { 'name': 'LinearSVC', 'pipeline': Pipeline([ ('vectorizer', TfidfVectorizer(max_features=5000, ngram_range=(1, 2))), ('classifier', LinearSVC(random_state=42, max_iter=10000)) ]) } ] # 各モデルを評価 results = [] for model in models: name = model['name'] pipeline = model['pipeline'] print(f" {name} を学習中...") start_time = time.time() # モデルのフィット pipeline.fit(X_train, y_train) # 予測 y_pred = pipeline.predict(X_test) # 評価メトリクス accuracy = accuracy_score(y_test, y_pred) report = classification_report(y_test, y_pred, output_dict=True) train_time = time.time() - start_time # 結果保存 results.append({ 'name': name, 'pipeline': pipeline, 'accuracy': accuracy, 'precision': report['weighted avg']['precision'], 'recall': report['weighted avg']['recall'], 'f1': report['weighted avg']['f1-score'], 'train_time': train_time }) print(f" {name} - 精度: {accuracy:.4f}, F1: {report['weighted avg']['f1-score']:.4f}, 学習時間: {train_time:.2f}秒") # 最良モデルの選定(F1スコアで比較) results.sort(key=lambda x: x['f1'], reverse=True) best_model = results[0] print(f"\n最良モデル: {best_model['name']}") print(f" - 精度: {best_model['accuracy']:.4f}") print(f" - 適合率: {best_model['precision']:.4f}") print(f" - 再現率: {best_model['recall']:.4f}") print(f" - F1スコア: {best_model['f1']:.4f}") # 学習結果の詳細を表示 print("\n詳細な分類レポート:") y_pred = best_model['pipeline'].predict(X_test) print(classification_report(y_test, y_pred)) # 混同行列の表示 cm = confusion_matrix(y_test, y_pred) print("\n混同行列:") print(cm) # モデルの保存 print(f"\nモデルを {MODEL_PATH} に保存中...") joblib.dump(best_model['pipeline'], MODEL_PATH) print("モデル保存完了") return best_model['pipeline'] def plot_importance(model_path): """ 特徴量重要度のプロット Args: model_path: 学習済みモデルのパス """ # モデルの読み込み pipeline = joblib.load(model_path) # モデルタイプ別の特徴量重要度の取得 feature_names = pipeline.named_steps['vectorizer'].get_feature_names_out() classifier = pipeline.named_steps['classifier'] # 重要度の抽出 importance_scores = None if hasattr(classifier, 'feature_importances_'): # RandomForestの場合 importance_scores = classifier.feature_importances_ elif hasattr(classifier, 'coef_'): # 線形モデルの場合 if len(classifier.coef_.shape) > 1: # 多クラス分類の場合、絶対値の平均を取る importance_scores = np.abs(classifier.coef_).mean(axis=0) else: # 二項分類の場合 importance_scores = np.abs(classifier.coef_[0]) if importance_scores is not None: # 上位20個の特徴量を表示 indices = np.argsort(importance_scores)[-20:] plt.figure(figsize=(10, 8)) plt.title('Top 20 Feature Importances') plt.barh(range(20), importance_scores[indices]) plt.yticks(range(20), [feature_names[i] for i in indices]) plt.tight_layout() # 保存 plot_path = os.path.join(MODELS_DIR, "feature_importance.png") plt.savefig(plot_path) print(f"特徴量重要度のプロットを {plot_path} に保存しました。") if __name__ == "__main__": # サンプルデータファイルパス data_path = os.path.join(DATA_DIR, "sample_emails.csv") # データファイルが存在するか確認 if not os.path.exists(data_path): print(f"データファイル {data_path} が見つかりません。") print("data_generator.pyを先に実行してください。") else: # モデルのトレーニングと評価 model = train_and_evaluate_model(data_path) # モデルが正常に学習された場合、特徴量重要度をプロット if model is not None and os.path.exists(MODEL_PATH): plot_importance(MODEL_PATH)