""" 데이터 로딩 및 전처리 모듈 """ import pandas as pd import numpy as np from sklearn.preprocessing import LabelEncoder from .time_utils import hermite_cubic_spline, calculate_time_derivative from .normalize import tanh_scale def process_data(data, use_spline=False, n_interpolation_points=5): """ 주식 데이터 전처리 함수 """ # 티커를 숫자 ID로 변환 ticker_encoder = LabelEncoder() all_tickers = data['ticker'].unique() ticker_encoder.fit(all_tickers) data['ticker_id'] = ticker_encoder.transform(data['ticker']) # 결측치 처리 data = data.fillna(method='ffill') # 티커별 데이터 저장 (원본 및 스플라인 보간 결과) ticker_data = {} for ticker in all_tickers: ticker_df = data[data['ticker'] == ticker].copy() ticker_data[ticker] = ticker_df # 스플라인 보간 적용 (옵션) if use_spline and n_interpolation_points > 0: # 날짜를 숫자 형태로 변환 (타임스탬프) time_points = ticker_df.index.astype(np.int64) // 10**9 # 초 단위로 변환 # 수치형 데이터 컬럼만 선택 numeric_cols = ticker_df.select_dtypes(include=[np.number]).columns numeric_data = ticker_df[numeric_cols].values # 에르미트 큐빅 스플라인 보간 적용 interpolated_data, interp_times = hermite_cubic_spline( numeric_data, n_interpolation_points=n_interpolation_points, time_points=time_points ) # 보간 결과 저장 ticker_data[f"{ticker}_spline"] = { 'data': interpolated_data, 'times': interp_times, 'columns': numeric_cols } print(f"{ticker}: 원본 데이터 {len(ticker_df)}개 → 보간 후 {len(interpolated_data)}개 포인트") return data, ticker_encoder, ticker_data def load_stock_data(ticker_string): """ 티커 문자열로부터 주식 데이터를 로드합니다. """ # 티커 문자열 처리 ticker_string = ticker_string[:-5] if ticker_string.endswith('_data') else ticker_string training_tickers = ticker_string.split('_') all_tickers = '_'.join(training_tickers) # 데이터 로드 filename = f"./data/{all_tickers}_data.csv" data = pd.read_csv(filename, parse_dates=['Date']) data = data.set_index('Date') data.sort_index(inplace=True) return data, training_tickers def prepare_data(data, window_size=60, n_interpolation_points=None): """ 시계열 데이터를 윈도우 기반 시퀀스로 준비 """ # 티커를 숫자 ID로 변환 ticker_encoder = LabelEncoder() all_tickers = data['ticker'].unique() ticker_encoder.fit(all_tickers) data['ticker_id'] = ticker_encoder.transform(data['ticker']) # 로그 수익률 계산 data['log_return'] = data.groupby('ticker')['Close'].transform(lambda x: np.log(x).diff()) data['log_return'] = data['log_return'].fillna(0) x_train_list, y_train_list, ticker_train_list, dt_train_list = [], [], [], [] x_val_list, y_val_list, ticker_val_list, dt_val_list = [], [], [], [] x_test_list, y_test_list, ticker_test_list, dt_test_list = [], [], [], [] # 스케일러 저장 scalers = {} for ticker in data['ticker'].unique(): ticker_df = data[data['ticker']==ticker] # 날짜 간격 계산 ticker_df = ticker_df.sort_index() # 날짜순 정렬 ticker_df['days_diff'] = (ticker_df.index.to_series().diff().dt.days).fillna(1.0) # 특성, 라벨, ID, 시간 간격 준비 drop_columns = ['ticker', 'Close', 'days_diff', 'ticker_id', 'log_return'] drop_columns = [col for col in drop_columns if col in ticker_df.columns] # 여기에 정규화 추가 feature_cols = [col for col in ticker_df.columns if col not in drop_columns] if len(feature_cols) > 0: # tanh 정규화 적용 scaled_features, scaler = tanh_scale( ticker_df[feature_cols].values, verbose=False ) scalers[ticker] = { 'scaler': scaler, 'feature_cols': feature_cols } # 정규화된 특성으로 대체 features = scaled_features else: features = np.array([]) # 특성이 없는 경우 처리 labels = ticker_df[['log_return']].values ids = ticker_df['ticker_id'].values time_diffs = ticker_df['days_diff'].values # 시퀀스 생성 (시간 간격 포함) seq_X, seq_Y, seq_ID, seq_dt = [], [], [], [] for i in range(len(features) - window_size): seq_X.append(features[i:i+window_size]) seq_Y.append(labels[i+window_size]) seq_ID.append(ids[i+window_size]) seq_dt.append(time_diffs[i+1:i+window_size+1]) if not seq_X: continue # 배열 변환 seq_X = np.stack(seq_X) seq_Y = np.stack(seq_Y) seq_ID = np.array(seq_ID) seq_dt = np.stack(seq_dt) # 시간 간격 데이터를 numpy 배열로 변환 # 훈련/검증/테스트 분할 n = len(seq_X) test_size = int(n*0.2) val_size = int(n*0.1) train_end = n - test_size - val_size # 각 세트에 추가 x_train_list.append(seq_X[:train_end]) y_train_list.append(seq_Y[:train_end]) ticker_train_list.append(seq_ID[:train_end]) dt_train_list.append(seq_dt[:train_end]) # 시간 간격 데이터 추가 if val_size > 0: x_val_list.append(seq_X[train_end:train_end+val_size]) y_val_list.append(seq_Y[train_end:train_end+val_size]) ticker_val_list.append(seq_ID[train_end:train_end+val_size]) dt_val_list.append(seq_dt[train_end:train_end+val_size]) # 시간 간격 데이터 추가 if test_size > 0: x_test_list.append(seq_X[-test_size:]) y_test_list.append(seq_Y[-test_size:]) ticker_test_list.append(seq_ID[-test_size:]) dt_test_list.append(seq_dt[-test_size:]) # 시간 간격 데이터 추가 # 데이터 없을 경우 예외 처리 if not x_train_list: raise ValueError("데이터 준비 중 오류: 학습 데이터가 없습니다.") # 데이터셋 병합 x_train = np.concatenate(x_train_list, axis=0) y_train = np.concatenate(y_train_list, axis=0) ticker_train = np.concatenate(ticker_train_list, axis=0) time_diffs_train = np.concatenate(dt_train_list, axis=0) # 시간 간격 데이터 병합 # 검증 및 테스트 세트 처리 if x_val_list: x_val = np.concatenate(x_val_list, axis=0) y_val = np.concatenate(y_val_list, axis=0) ticker_val = np.concatenate(ticker_val_list, axis=0) time_diffs_val = np.concatenate(dt_val_list, axis=0) else: # 비어있는 경우 올바른 shape의 빈 배열 생성 x_val = np.empty((0, window_size, x_train.shape[2])) y_val = np.empty((0, 1)) ticker_val = np.empty((0,)) time_diffs_val = np.empty((0, window_size)) if x_test_list: x_test = np.concatenate(x_test_list, axis=0) y_test = np.concatenate(y_test_list, axis=0) ticker_test = np.concatenate(ticker_test_list, axis=0) time_diffs_test = np.concatenate(dt_test_list, axis=0) else: # 비어있는 경우 올바른 shape의 빈 배열 생성 x_test = np.empty((0, window_size, x_train.shape[2])) y_test = np.empty((0, 1)) ticker_test = np.empty((0,)) time_diffs_test = np.empty((0, window_size)) # 라벨의 미분(이산) 계산 y_train_dt = calculate_time_derivative(y_train) y_val_dt = calculate_time_derivative(y_val) if len(y_val) > 0 else None y_test_dt = calculate_time_derivative(y_test) if len(y_test) > 0 else None # 전체 데이터의 날짜 범위 저장 date_min = data.index.min() date_max = data.index.max() # 특성 수 출력 print(f"전처리 완료: 특성 수={x_train.shape[2]}, 학습 샘플 수={x_train.shape[0]}") result_dict = { 'x_train': x_train, 'y_train': y_train, 'ticker_train': ticker_train, 'y_train_dt': y_train_dt, 'time_diffs_train': time_diffs_train, 'x_val': x_val, 'y_val': y_val, 'ticker_val': ticker_val, 'y_val_dt': y_val_dt, 'time_diffs_val': time_diffs_val, 'x_test': x_test, 'y_test': y_test, 'ticker_test': ticker_test, 'y_test_dt': y_test_dt, 'time_diffs_test': time_diffs_test, # 날짜 범위 정보 추가 'start_date': date_min, 'end_date': date_max, 'scalers': scalers, 'data': data } return result_dict, ticker_encoder, data