import os import sys import argparse import numpy as np import tensorflow as tf from sklearn.model_selection import train_test_split sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.model import create_malconv_model from src.utils import ( configure_gpu_memory, plot_training_history, evaluate_model, get_file_paths_and_labels, data_generator, read_binary_file ) def train_malconv(data_source, epochs=10, batch_size=256, max_length=2_000_000, validation_split=0.2, save_path="models/malconv_model.h5"): """ MalConv 모델 훈련 (데이터 제너레이터 사용) Args: data_source: (malware_dir, benign_dir) 튜플 epochs: 훈련 에포크 수 batch_size: 배치 크기 max_length: 최대 입력 길이 (2MB) validation_split: 검증 데이터 비율 save_path: 모델 저장 경로 """ print("=" * 60) print("MalConv 모델 훈련 시작 (데이터 제너레이터 모드)") print("=" * 60) # GPU 설정 configure_gpu_memory() # 데이터 경로 및 레이블 로딩 if isinstance(data_source, tuple) and len(data_source) == 2: malware_dir, benign_dir = data_source filepaths, labels = get_file_paths_and_labels(malware_dir, benign_dir) else: raise ValueError("data_source는 (malware_dir, benign_dir) 튜플이어야 합니다.") # 훈련/검증 분할 (파일 경로 기준) filepaths_train, filepaths_val, labels_train, labels_val = train_test_split( filepaths, labels, test_size=validation_split, random_state=42, stratify=labels ) print(f"총 데이터: {len(filepaths)}") print(f"훈련 데이터: {len(filepaths_train)}, 검증 데이터: {len(filepaths_val)}") # 데이터 제너레이터 생성 train_gen = data_generator(filepaths_train, labels_train, batch_size, max_length) val_gen = data_generator(filepaths_val, labels_val, batch_size, max_length, shuffle=False) # 검증 시에는 셔플 안함 # 모델 생성 print("MalConv 모델 생성 중...") model = create_malconv_model(max_length) # 더미 입력으로 모델 빌드 dummy_input = np.zeros((1, max_length), dtype=np.uint8) _ = model(dummy_input) print("\n=== 모델 아키텍처 ===") model.summary() print(f"총 파라미터 수: {model.count_params():,}") # 콜백 설정 callbacks = [ tf.keras.callbacks.EarlyStopping( monitor='val_loss', patience=5, # 참을성 증가 restore_best_weights=True, verbose=1 ), tf.keras.callbacks.ModelCheckpoint( save_path, monitor='val_auc', save_best_only=True, verbose=1, mode='max' # AUC는 높을수록 좋음 ) ] # 훈련 print(f"\n=== 훈련 시작 ===") print(f"배치 크기: {batch_size}") print(f"에포크: {epochs}") history = model.fit( train_gen, steps_per_epoch=len(filepaths_train) // batch_size, epochs=epochs, validation_data=val_gen, validation_steps=len(filepaths_val) // batch_size, callbacks=callbacks, verbose=1 ) # 평가 (메모리 문제로 검증 데이터의 일부만 사용) print("\n=== 최종 평가 ===") num_eval_samples = min(len(filepaths_val), 1024) # 평가 샘플 수 제한 X_eval = np.array([read_binary_file(fp, max_length) for fp in filepaths_val[:num_eval_samples]]) y_eval = np.array(labels_val[:num_eval_samples]) if X_eval.size > 0: results = evaluate_model(model, X_eval, y_eval, batch_size=batch_size//2) else: print("평가할 데이터가 없습니다.") results = {} # 시각화 plot_training_history(history) print(f"\n모델이 저장되었습니다: {save_path}") return model, history, results def main(): parser = argparse.ArgumentParser(description='MalConv 모델 훈련') # 데이터 소스 옵션 parser.add_argument('--malware_dir', required=True, help='악성코드 디렉토리') parser.add_argument('--benign_dir', required=True, help='정상파일 디렉토리') # 훈련 옵션 parser.add_argument('--epochs', type=int, default=20, help='에포크 수') # 에포크 증가 parser.add_argument('--batch_size', type=int, default=64, help='배치 크기') # 배치 크기 조정 parser.add_argument('--max_length', type=int, default=2_000_000, help='최대 입력 길이') parser.add_argument('--save_path', default='models/malconv_model.h5', help='모델 저장 경로') args = parser.parse_args() data_source = (args.malware_dir, args.benign_dir) # 저장 디렉토리 생성 os.makedirs(os.path.dirname(args.save_path), exist_ok=True) # 모델 훈련 train_malconv( data_source=data_source, epochs=args.epochs, batch_size=args.batch_size, max_length=args.max_length, save_path=args.save_path ) if __name__ == "__main__": main()