""" 시각화 함수 모듈 """ import numpy as np import pandas as pd import matplotlib.pyplot as plt import matplotlib.dates as mdates import networkx as nx import traceback from matplotlib.patches import Patch from sklearn.manifold import TSNE from sklearn.decomposition import PCA from node2vec import Node2Vec def clean_for_visualization(X): """ 시각화를 위한 간단한 데이터 정리 함수 """ if X is None: return X X = np.asarray(X, dtype=np.float32) # NaN과 무한값을 0으로 대체 X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0) return X def plot_training_history(history): """학습 과정의 손실과 학습률을 시각화""" fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5)) history_dict = history.history if hasattr(history, 'history') else history # 손실 그래프 ax1.plot(history_dict['loss'], label='Train Loss') ax1.plot(history_dict['val_loss'], label='Validation Loss') ax1.set_title('Model Loss') ax1.set_xlabel('Epoch') ax1.set_ylabel('Loss') ax1.legend() ax1.grid(True) # 학습률 그래프 ax2.plot(history_dict['learning_rate'], label='Learning Rate') ax2.set_title('Learning Rate') ax2.set_xlabel('Epoch') ax2.set_ylabel('Learning Rate') ax2.legend() ax2.grid(True) plt.tight_layout() return fig def plot_performance_grid(grid_results): """임계값별 성능 지표를 그리드로 시각화""" fig, axes = plt.subplots(2, 2, figsize=(9, 6)) # 1) Total Return for comm, res in grid_results.items(): ths = list(res.keys()) rets = [res[t]['total_return'] for t in ths] axes[0, 0].plot(ths, rets, label=f'Comm {comm*100:.2f}%') axes[0, 0].set_title('Total Return by Threshold') axes[0, 0].set_xlabel('Threshold') axes[0, 0].set_ylabel('Total Return') axes[0, 0].legend() axes[0, 0].grid(True) # 2) Sharpe Ratio for comm, res in grid_results.items(): ths = list(res.keys()) sps = [res[t]['sharpe_ratio'] for t in ths] axes[0, 1].plot(ths, sps, label=f'Comm {comm*100:.2f}%') axes[0, 1].set_title('Sharpe Ratio by Threshold') axes[0, 1].set_xlabel('Threshold') axes[0, 1].set_ylabel('Sharpe Ratio') axes[0, 1].legend() axes[0, 1].grid(True) # 3) Trade Count for comm, res in grid_results.items(): ths = list(res.keys()) tcs = [len(res[t].get('trades', [])) for t in ths] axes[1, 0].plot(ths, tcs, label=f'Comm {comm*100:.2f}%') axes[1, 0].set_title('Number of Trades') axes[1, 0].set_xlabel('Threshold') axes[1, 0].set_ylabel('Trades') axes[1, 0].legend() axes[1, 0].grid(True) # 4) Max Drawdown for comm, res in grid_results.items(): ths = list(res.keys()) mdds = [res[t]['max_drawdown'] for t in ths] axes[1, 1].plot(ths, mdds, label=f'Comm {comm*100:.2f}%') axes[1, 1].set_title('Max Drawdown by Threshold') axes[1, 1].set_xlabel('Threshold') axes[1, 1].set_ylabel('Max Drawdown') axes[1, 1].legend() axes[1, 1].grid(True) plt.tight_layout() return fig def plot_signal_distribution(y_pred, best_threshold): """예측 신호의 분포를 시각화""" fig, ax = plt.subplots(figsize=(12, 6)) # 매매 신호 분포 buy_signals = y_pred > best_threshold sell_signals = y_pred < -best_threshold hold_signals = (y_pred >= -best_threshold) & (y_pred <= best_threshold) ax.hist([y_pred[buy_signals], y_pred[sell_signals], y_pred[hold_signals]], bins=50, label=['Buy', 'Sell', 'Hold'], alpha=0.7) ax.axvline(x=best_threshold, color='r', linestyle='--', label=f'Buy Threshold ({best_threshold:.4f})') ax.axvline(x=-best_threshold, color='g', linestyle='--', label=f'Sell Threshold (-{best_threshold:.4f})') ax.set_xlabel('Predicted Returns') ax.set_ylabel('Frequency') ax.set_title('Distribution of Trading Signals') ax.legend() ax.grid(True) plt.tight_layout() return fig def plot_price_predictions(model, data_dict, best_threshold, ticker_encoder, x_test_clean=None): """ 모델의 예측 변동률을 기반으로 예측 종가를 계산하고 실제 종가와 함께 시각화 x_test_clean: 외부에서 정리된 테스트 데이터 (선택적) """ try: # 테스트 데이터 추출 x_test = data_dict['x_test'] ticker_test = data_dict['ticker_test'] data = data_dict['data'] time_diffs_test = data_dict.get('time_diffs_test') # 빈 데이터 확인 if len(x_test) == 0: print("테스트 데이터가 없습니다. 검증 데이터를 사용합니다.") x_test = data_dict['x_val'] ticker_test = data_dict['ticker_val'] time_diffs_test = data_dict.get('time_diffs_val') if len(x_test) == 0: print("시각화할 데이터가 없습니다.") return None # 데이터 정리 - 외부에서 전달된 경우 사용, 아니면 간단한 정리만 if x_test_clean is not None: x_test_processed = x_test_clean else: x_test_processed = clean_for_visualization(x_test) ticker_test_clean = np.asarray(ticker_test, dtype=np.int32) # 섹터/산업 데이터 처리 sector_test = data_dict.get('sector_test') industry_test = data_dict.get('industry_test') # 섹터/산업 정보가 없는 경우 x_test에서 추출 시도 (특성 59, 60이 해당) sector_feature_idx = 59 # 'Technology', 'Financial Services' 등 섹터 정보 industry_feature_idx = 60 # 'Semiconductors', 'Banks - Diversified' 등 산업 정보 if (sector_test is None or industry_test is None) and x_test_processed.shape[2] > max(sector_feature_idx, industry_feature_idx): sector_test = x_test_processed[:, -1, sector_feature_idx].astype(np.int32) industry_test = x_test_processed[:, -1, industry_feature_idx].astype(np.int32) # 더미 데이터 생성 (필요한 경우) if sector_test is None or industry_test is None: print("섹터/산업 정보가 없습니다. 더미 데이터를 생성합니다.") sector_test = np.zeros_like(ticker_test_clean) industry_test = np.zeros_like(ticker_test_clean) else: sector_test = np.asarray(sector_test, dtype=np.int32) industry_test = np.asarray(industry_test, dtype=np.int32) # 시간 간격 데이터 if time_diffs_test is None or len(time_diffs_test) == 0: print("시간 간격 데이터가 없습니다. 더미 데이터를 생성합니다.") time_diffs_test = np.ones((x_test_processed.shape[0], x_test_processed.shape[1]), dtype=np.float32) else: time_diffs_test = np.asarray(time_diffs_test, dtype=np.float32) # 예측 수행 (5개 입력 제공) test_preds = model.predict( [x_test_processed, ticker_test_clean, sector_test, industry_test, time_diffs_test], verbose=0 ) # 결과 형태 처리 if isinstance(test_preds, list): test_pred_values = test_preds[0].flatten() # value_output else: test_pred_values = test_preds.flatten() ticker_test_flat = ticker_test.flatten() if len(test_pred_values) != len(ticker_test_flat): # 시퀀스 길이(일반적으로 60)를 추정 seq_len = x_test_processed.shape[1] # 시퀀스 길이 # 예측값 배열이 (샘플 * 시퀀스 길이) 형태인 경우 if len(test_pred_values) == len(ticker_test_flat) * seq_len: # 각 시퀀스의 마지막 예측만 사용하도록 추출 test_pred_values = test_pred_values.reshape(-1, seq_len)[:, -1] else: # 다른 형태의 불일치인 경우, 각 시퀀스의 마지막 값만 사용 print(f"차원 불일치: 예측값={len(test_pred_values)}, 티커={len(ticker_test_flat)}") print("타임스텝 당 예측값을 추출하기 위해 샘플링 수행...") # 예측값의 길이를 티커 배열 길이에 맞추기 위한 샘플링 팩터 계산 factor = int(len(test_pred_values) / len(ticker_test_flat)) if factor > 1: # factor 간격으로 샘플링 test_pred_values = test_pred_values[factor-1::factor] print(f"샘플링 후 예측값 형상: {test_pred_values.shape}") # 고유 종목 ID 찾기 unique_tickers = np.unique(ticker_test) # 종목별 시각화 fig, axes = plt.subplots(len(unique_tickers), 1, figsize=(12, len(unique_tickers) * 3)) if len(unique_tickers) == 1: axes = [axes] for i, ticker_id in enumerate(unique_tickers): # 현재 종목의 테스트 데이터 필터링 ticker_mask = ticker_test_flat == ticker_id # 마스크와 예측값 차원 확인 if len(ticker_mask) != len(test_pred_values): print(f"티커 ID {ticker_id}에 대한 마스크 길이({len(ticker_mask)})와 예측값 길이({len(test_pred_values)})가 일치하지 않습니다.") continue ticker_indices = np.where(ticker_mask)[0] if len(ticker_indices) < 2: print(f"티커 ID {ticker_id}에 대한 샘플이 부족합니다.") continue # 현재 종목 이름 가져오기 ticker_name = ticker_encoder.inverse_transform([int(ticker_id)])[0] # 이 종목에 대한 예측값 ticker_preds = test_pred_values[ticker_mask] # 원본 데이터에서 이 종목의 실제 종가 가져오기 ticker_data = data[data['ticker'] == ticker_name].copy() ticker_data = ticker_data.sort_index() # 날짜순 정렬 # 적절한 인덱스 범위 확인 if len(ticker_data) < len(ticker_preds) + 1: print(f"티커 {ticker_name}의 데이터가 충분하지 않습니다: {len(ticker_data)} < {len(ticker_preds) + 1}") continue # 실제 종가 데이터 추출 actual_prices = ticker_data['Close'].values[-len(ticker_preds)-1:] # 하루 더 이전부터 가져옴 dates = ticker_data.index[-len(ticker_preds)-1:] # 날짜도 함께 가져옴 # 예측 종가 계산 (예측된 로그 수익률에서 실제 가격으로 변환) predicted_prices = [] last_price = actual_prices[0] # 첫 번째 실제 가격으로 시작 for j, pred in enumerate(ticker_preds): # 로그 수익률에서 일반 수익률로 변환: r = exp(log_r) - 1 predicted_return = np.exp(pred) - 1 # 수익률을 가격으로 변환 predicted_price = last_price * (1 + predicted_return) predicted_prices.append(predicted_price) # 다음 예측을 위해 실제 마지막 가격 업데이트 last_price = actual_prices[j+1] # 실제 종가와 예측 종가 그리기 axes[i].plot(dates[1:], actual_prices[1:], 'b-', label='Actual Price', linewidth=2) axes[i].plot(dates[1:], predicted_prices, 'r--', label='Predicted Price', linewidth=2) # 포지션 상태 추적 로직 - 백테스트와 동일한 방식으로 구현 positions = np.zeros(len(ticker_preds)) current_position = 0 # 실제 백테스트 로직처럼 포지션 추적 for j, pred in enumerate(ticker_preds): # 신호 결정 (백테스트 로직과 동일) new_signal = 1 if pred > best_threshold else (-1 if pred < -best_threshold else 0) # 포지션 변경시에만 업데이트 if new_signal != current_position: current_position = new_signal # 현재 포지션 저장 positions[j] = current_position # 포지션 타입별로 데이터 분리 long_indices = positions == 1 short_indices = positions == -1 neutral_indices = positions == 0 # 색상 및 마커 설정 long_color = 'green' short_color = 'red' neutral_color = 'gray' # 각 포지션 상태별로 마커 표시 if any(long_indices): axes[i].scatter(dates[1:][long_indices], actual_prices[1:][long_indices], marker='^', color=long_color, s=100, label='Long Position') if any(short_indices): axes[i].scatter(dates[1:][short_indices], actual_prices[1:][short_indices], marker='v', color=short_color, s=100, label='Short Position') if any(neutral_indices): axes[i].scatter(dates[1:][neutral_indices], actual_prices[1:][neutral_indices], marker='o', color=neutral_color, s=50, label='Neutral Position') # 색상으로 배경 표시 (선택적) for j in range(1, len(positions)): if positions[j] != positions[j-1] or j == 1: # 포지션 변경 또는 첫 포지션 start_idx = j pos_type = positions[j] # 같은 포지션이 끝나는 지점 찾기 end_idx = start_idx while end_idx < len(positions) and positions[end_idx] == pos_type: end_idx += 1 # 배경색 설정 if pos_type == 1: # 롱 포지션 axes[i].axvspan(dates[1:][start_idx], dates[1:][min(end_idx, len(dates)-2)], alpha=0.1, color=long_color) elif pos_type == -1: # 숏 포지션 axes[i].axvspan(dates[1:][start_idx], dates[1:][min(end_idx, len(dates)-2)], alpha=0.1, color=short_color) # 그래프 설정 axes[i].set_title(f'{ticker_name} - Actual vs. Predicted Price with Position Changes') axes[i].set_xlabel('Date') axes[i].set_ylabel('Price') axes[i].legend() axes[i].grid(True, alpha=0.3) # 날짜 형식 설정 axes[i].xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) axes[i].xaxis.set_major_locator(mdates.WeekdayLocator(interval=2)) plt.tight_layout() return fig except Exception as e: print(f"Price prediction visualization failed: {e}") import traceback traceback.print_exc() return None def plot_graph_embeddings(sector_industry_df, save_path_tsne=None, save_path_pca=None): """ 섹터-산업 데이터로부터 그래프 생성 → 임베딩 → 시각화까지 통합 수행 """ # 그래프 생성 G = nx.Graph() tickers = sector_industry_df['ticker'].tolist() # 노드 추가 for ticker in tickers: sector = sector_industry_df[sector_industry_df['ticker'] == ticker]['sector'].iloc[0] industry = sector_industry_df[sector_industry_df['ticker'] == ticker]['industry'].iloc[0] G.add_node(ticker, sector=sector, industry=industry) # 엣지 추가 (섹터/산업 기반) for i, ticker1 in enumerate(tickers): for j, ticker2 in enumerate(tickers[i+1:], i+1): sector1 = sector_industry_df[sector_industry_df['ticker'] == ticker1]['sector'].iloc[0] sector2 = sector_industry_df[sector_industry_df['ticker'] == ticker2]['sector'].iloc[0] industry1 = sector_industry_df[sector_industry_df['ticker'] == ticker1]['industry'].iloc[0] industry2 = sector_industry_df[sector_industry_df['ticker'] == ticker2]['industry'].iloc[0] weight = 0 if sector1 == sector2: weight += 0.5 if industry1 == industry2: weight += 0.3 if weight > 0.3: G.add_edge(ticker1, ticker2, weight=weight) # print(f"그래프 생성: {G.number_of_nodes()}개 노드, {G.number_of_edges()}개 엣지") # 임베딩 생성 try: node2vec = Node2Vec(G, dimensions=64, walk_length=30, num_walks=200, workers=4) model = node2vec.fit(window=10, min_count=1, batch_words=4) embeddings = {} for node in G.nodes(): try: embeddings[node] = model.wv[node] except KeyError: embeddings[node] = np.random.normal(0, 0.1, 64) # print("Node2Vec 임베딩 생성 완료") except ImportError: print("Node2Vec 없음. PCA 기반 임베딩 사용") adj_matrix = nx.adjacency_matrix(G).todense() pca = PCA(n_components=min(64, adj_matrix.shape[0])) embeddings_matrix = pca.fit_transform(adj_matrix) embeddings = {} for i, node in enumerate(G.nodes()): embeddings[node] = embeddings_matrix[i] if i < embeddings_matrix.shape[0] else np.random.normal(0, 0.1, 64) # 시각화 (t-SNE + PCA) embedding_matrix = np.array([embeddings[ticker] for ticker in tickers]) # 섹터별 색상 sectors = sector_industry_df['sector'].unique() sector_colors = dict(zip(sectors, plt.cm.Set3(np.linspace(0, 1, len(sectors))))) # t-SNE와 PCA 동시 시각화 fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, 8)) # t-SNE tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(tickers)//3)) coords_tsne = tsne.fit_transform(embedding_matrix) for i, ticker in enumerate(tickers): sector = sector_industry_df[sector_industry_df['ticker'] == ticker]['sector'].iloc[0] color = sector_colors[sector] ax1.scatter(coords_tsne[i, 0], coords_tsne[i, 1], c=[color], s=100, alpha=0.7, edgecolors='black') ax1.annotate(ticker, (coords_tsne[i, 0], coords_tsne[i, 1]), xytext=(5, 5), textcoords='offset points', fontweight='bold') ax1.set_title('Graph Embedding Visualization (t-SNE)', fontsize=14, fontweight='bold') ax1.grid(True, alpha=0.3) # PCA pca = PCA(n_components=2) coords_pca = pca.fit_transform(embedding_matrix) for i, ticker in enumerate(tickers): sector = sector_industry_df[sector_industry_df['ticker'] == ticker]['sector'].iloc[0] color = sector_colors[sector] ax2.scatter(coords_pca[i, 0], coords_pca[i, 1], c=[color], s=100, alpha=0.7, edgecolors='black') ax2.annotate(ticker, (coords_pca[i, 0], coords_pca[i, 1]), xytext=(5, 5), textcoords='offset points', fontweight='bold') ax2.set_title('Graph Embedding Visualization (PCA)', fontsize=14, fontweight='bold') ax2.grid(True, alpha=0.3) # 범례 (공통) legend_elements = [Patch(color=color, label=sector) for sector, color in sector_colors.items()] ax2.legend(handles=legend_elements, loc='upper right') plt.tight_layout() # 저장 if save_path_tsne and save_path_pca: # 개별 저장 fig1, ax_tsne = plt.subplots(figsize=(12, 10)) for i, ticker in enumerate(tickers): sector = sector_industry_df[sector_industry_df['ticker'] == ticker]['sector'].iloc[0] color = sector_colors[sector] ax_tsne.scatter(coords_tsne[i, 0], coords_tsne[i, 1], c=[color], s=100, alpha=0.7, edgecolors='black') ax_tsne.annotate(ticker, (coords_tsne[i, 0], coords_tsne[i, 1]), xytext=(5, 5), textcoords='offset points', fontweight='bold') ax_tsne.set_title('Graph Embedding Visualization (t-SNE)', fontsize=16, fontweight='bold') ax_tsne.grid(True, alpha=0.3) ax_tsne.legend(handles=legend_elements, loc='upper right') plt.tight_layout() plt.savefig(save_path_tsne, dpi=300, bbox_inches='tight', facecolor='white') plt.close(fig1) fig2, ax_pca = plt.subplots(figsize=(12, 10)) for i, ticker in enumerate(tickers): sector = sector_industry_df[sector_industry_df['ticker'] == ticker]['sector'].iloc[0] color = sector_colors[sector] ax_pca.scatter(coords_pca[i, 0], coords_pca[i, 1], c=[color], s=100, alpha=0.7, edgecolors='black') ax_pca.annotate(ticker, (coords_pca[i, 0], coords_pca[i, 1]), xytext=(5, 5), textcoords='offset points', fontweight='bold') ax_pca.set_title('Graph Embedding Visualization (PCA)', fontsize=16, fontweight='bold') ax_pca.grid(True, alpha=0.3) ax_pca.legend(handles=legend_elements, loc='upper right') plt.tight_layout() plt.savefig(save_path_pca, dpi=300, bbox_inches='tight', facecolor='white') plt.close(fig2) # print(f"t-SNE 저장: {save_path_tsne}") # print(f"PCA 저장: {save_path_pca}") plt.close() return coords_tsne, coords_pca