import os import numpy as np import pandas as pd from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, balanced_accuracy_score import matplotlib.pyplot as plt import seaborn as sns # utils.py에 누락된 import import tensorflow as tf # configure_gpu_memory 함수에서 필요 def read_binary_file(file_path, max_length=2_000_000): """ 바이너리 파일을 읽어 정수 배열로 변환 논문 사양: 2MB까지 처리 """ try: with open(file_path, 'rb') as f: raw_bytes = f.read() # 바이트를 0-255 정수로 변환 byte_array = np.frombuffer(raw_bytes, dtype=np.uint8) if len(byte_array) > max_length: # 긴 파일: 앞 2MB만 사용 (논문 방식) return byte_array[:max_length] else: # 짧은 파일: 0으로 패딩 padded = np.zeros(max_length, dtype=np.uint8) padded[:len(byte_array)] = byte_array return padded except Exception as e: print(f"파일 읽기 오류 {file_path}: {e}") return np.zeros(max_length, dtype=np.uint8) def load_dataset_from_directory(malware_dir, benign_dir, max_length=2_000_000, max_samples_per_class=None): """ 디렉토리에서 직접 바이너리 파일들을 로드 Args: malware_dir: 악성코드 파일들이 있는 디렉토리 benign_dir: 정상 파일들이 있는 디렉토리 max_length: 최대 바이트 길이 max_samples_per_class: 클래스당 최대 샘플 수 """ X, y = [], [] # 악성코드 파일 로드 if os.path.exists(malware_dir): malware_files = [f for f in os.listdir(malware_dir) if os.path.isfile(os.path.join(malware_dir, f))] if max_samples_per_class: malware_files = malware_files[:max_samples_per_class] print(f"악성코드 파일 로딩 중... ({len(malware_files)}개)") for i, filename in enumerate(malware_files): file_path = os.path.join(malware_dir, filename) byte_array = read_binary_file(file_path, max_length) X.append(byte_array) y.append(0) # 악성코드 = 0 if (i + 1) % 100 == 0: print(f" {i + 1}/{len(malware_files)} 처리 완료") # 정상 파일 로드 if os.path.exists(benign_dir): benign_files = [f for f in os.listdir(benign_dir) if os.path.isfile(os.path.join(benign_dir, f))] if max_samples_per_class: benign_files = benign_files[:max_samples_per_class] print(f"정상 파일 로딩 중... ({len(benign_files)}개)") for i, filename in enumerate(benign_files): file_path = os.path.join(benign_dir, filename) byte_array = read_binary_file(file_path, max_length) X.append(byte_array) y.append(1) # 정상 = 1 if (i + 1) % 100 == 0: print(f" {i + 1}/{len(benign_files)} 처리 완료") X = np.array(X) y = np.array(y) print(f"\n데이터셋 로딩 완료:") print(f" 총 샘플: {len(X)}") print(f" 악성코드: {np.sum(y == 0)}") print(f" 정상파일: {np.sum(y == 1)}") return X, y def load_dataset_from_csv(csv_path, max_length=2_000_000): """CSV 파일에서 데이터셋 로드""" df = pd.read_csv(csv_path) X, y = [], [] print("CSV에서 파일 로딩 중...") for idx, row in df.iterrows(): file_path = row['filepath'] label = row['label'] if os.path.exists(file_path): byte_array = read_binary_file(file_path, max_length) X.append(byte_array) y.append(label) else: print(f"파일을 찾을 수 없습니다: {file_path}") if (idx + 1) % 1000 == 0: print(f" {idx + 1} 파일 처리 완료") return np.array(X), np.array(y) def configure_gpu_memory(): """GPU 메모리 설정""" gpus = tf.config.experimental.list_physical_devices('GPU') if gpus: try: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) print(f"GPU 설정 완료: {len(gpus)}개 GPU 사용") return True except RuntimeError as e: print(f"GPU 설정 오류: {e}") return False def plot_training_history(history): """훈련 히스토리 시각화""" fig, axes = plt.subplots(2, 2, figsize=(12, 10)) # Loss axes[0, 0].plot(history.history['loss'], label='Training Loss') if 'val_loss' in history.history: axes[0, 0].plot(history.history['val_loss'], label='Validation Loss') axes[0, 0].set_title('Model Loss') axes[0, 0].set_xlabel('Epoch') axes[0, 0].set_ylabel('Loss') axes[0, 0].legend() axes[0, 0].grid(True) # Accuracy axes[0, 1].plot(history.history['accuracy'], label='Training Accuracy') if 'val_accuracy' in history.history: axes[0, 1].plot(history.history['val_accuracy'], label='Validation Accuracy') axes[0, 1].set_title('Model Accuracy') axes[0, 1].set_xlabel('Epoch') axes[0, 1].set_ylabel('Accuracy') axes[0, 1].legend() axes[0, 1].grid(True) # AUC if 'auc' in history.history: axes[1, 0].plot(history.history['auc'], label='Training AUC') if 'val_auc' in history.history: axes[1, 0].plot(history.history['val_auc'], label='Validation AUC') axes[1, 0].set_title('Model AUC') axes[1, 0].set_xlabel('Epoch') axes[1, 0].set_ylabel('AUC') axes[1, 0].legend() axes[1, 0].grid(True) # Learning Rate if 'lr' in history.history: axes[1, 1].plot(history.history['lr'], label='Learning Rate', color='red') axes[1, 1].set_title('Learning Rate Schedule') axes[1, 1].set_xlabel('Epoch') axes[1, 1].set_ylabel('Learning Rate') axes[1, 1].set_yscale('log') axes[1, 1].legend() axes[1, 1].grid(True) plt.tight_layout() plt.show() def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"): """혼동 행렬 시각화""" cm = confusion_matrix(y_true, y_pred) plt.figure(figsize=(8, 6)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Malware', 'Benign'], yticklabels=['Malware', 'Benign']) plt.title(title) plt.ylabel('True Label') plt.xlabel('Predicted Label') plt.show() def evaluate_model(model, X_test, y_test, batch_size=16): """모델 성능 평가""" print("모델 평가 중...") # 예측 y_pred_prob = model.predict(X_test, batch_size=batch_size, verbose=1) y_pred = (y_pred_prob > 0.5).astype(int).flatten() # 메트릭 계산 accuracy = np.mean(y_pred == y_test) balanced_acc = balanced_accuracy_score(y_test, y_pred) auc_score = roc_auc_score(y_test, y_pred_prob) print(f"\n=== 평가 결과 ===") print(f"Accuracy: {accuracy:.4f}") print(f"Balanced Accuracy: {balanced_acc:.4f}") print(f"AUC Score: {auc_score:.4f}") print(f"\n분류 리포트:") print(classification_report(y_test, y_pred, target_names=['Malware', 'Benign'])) # 혼동 행렬 시각화 plot_confusion_matrix(y_test, y_pred, "MalConv Performance") return { 'accuracy': accuracy, 'balanced_accuracy': balanced_acc, 'auc': auc_score, 'predictions': y_pred_prob } def get_file_paths_and_labels(malware_dir, benign_dir, max_samples_per_class=None): """ 디렉토리에서 파일 경로와 레이블 목록을 가져옵니다. (메모리에 파일 로드 안함) """ filepaths = [] labels = [] # 악성코드 파일 경로 if os.path.exists(malware_dir): malware_files = [os.path.join(malware_dir, f) for f in os.listdir(malware_dir) if os.path.isfile(os.path.join(malware_dir, f))] if max_samples_per_class: malware_files = malware_files[:max_samples_per_class] filepaths.extend(malware_files) labels.extend([0] * len(malware_files)) # 악성코드 = 0 print(f"악성코드 파일 경로 로딩: {len(malware_files)}개") # 정상 파일 경로 if os.path.exists(benign_dir): benign_files = [os.path.join(benign_dir, f) for f in os.listdir(benign_dir) if os.path.isfile(os.path.join(benign_dir, f))] if max_samples_per_class: benign_files = benign_files[:max_samples_per_class] filepaths.extend(benign_files) labels.extend([1] * len(benign_files)) # 정상 = 1 print(f"정상 파일 경로 로딩: {len(benign_files)}개") print(f"\n총 파일 경로: {len(filepaths)}") print(f" 악성코드: {labels.count(0)}") print(f" 정상파일: {labels.count(1)}") # 데이터 순서 섞기 indices = np.arange(len(filepaths)) np.random.shuffle(indices) filepaths = np.array(filepaths)[indices].tolist() labels = np.array(labels)[indices] return filepaths, labels def data_generator(filepaths, labels, batch_size, max_length=2_000_000, shuffle=True): """ 데이터를 배치 단위로 생성하는 제너레이터 """ num_samples = len(filepaths) if num_samples == 0: return while True: indices = np.arange(num_samples) if shuffle: np.random.shuffle(indices) for i in range(0, num_samples, batch_size): batch_indices = indices[i:i+batch_size] X_batch = [] y_batch_list = [] for j in batch_indices: try: X_batch.append(read_binary_file(filepaths[j], max_length)) y_batch_list.append(labels[j]) except Exception as e: print(f"Warning: Skipping file {filepaths[j]} due to error: {e}") continue if not X_batch: continue yield np.array(X_batch), np.array(y_batch_list)