""" 데이터 정규화 유틸리티 """ import numpy as np import pandas as pd from sklearn.preprocessing import StandardScaler, LabelEncoder def clean_numeric_data(X, replace_nan=0.0, replace_inf=0.0, verbose=False): """ 입력 데이터를 수치형으로 변환하고 이상값 처리 """ if X is None or X.size == 0: return X # numpy 배열로 변환 X = np.asarray(X) # 입력이 1차원인 경우 2차원으로 변환 if X.ndim == 1: X = X.reshape(-1, 1) was_1d = True was_3d = False original_shape = None elif X.ndim == 3: # 3차원 배열인 경우 2차원으로 reshape original_shape = X.shape X = X.reshape(X.shape[0], -1) was_3d = True was_1d = False else: was_1d = False was_3d = False original_shape = None if np.issubdtype(X.dtype, np.number): # 수치형 데이터의 경우 바로 이상값 처리 X_cleaned = X.astype(np.float32) # NaN과 무한값 처리 nan_mask = np.isnan(X_cleaned) inf_mask = np.isinf(X_cleaned) if verbose and (nan_mask.any() or inf_mask.any()): nan_count = nan_mask.sum() inf_count = inf_mask.sum() total = X_cleaned.size print(f"NaN: {nan_count}개, Inf: {inf_count}개 / 전체 {total}개 ({(nan_count+inf_count)/total*100:.2f}%)") # 이상값 대체 X_cleaned = np.nan_to_num(X_cleaned, nan=replace_nan, posinf=replace_inf, neginf=-replace_inf) else: # 비수치형 데이터 처리 X_cleaned = np.zeros((X.shape[0], X.shape[1]), dtype=np.float32) for col in range(X.shape[1]): try: # 수치형으로 변환 시도 col_data = X[:, col] X_cleaned[:, col] = col_data.astype(np.float32) except (ValueError, TypeError): # 문자열 값을 숫자로 인코딩 if verbose: print(f"경고: 열 {col}에 비수치 데이터가 포함되어 있어 인코딩합니다.") # 각 원소를 문자열로 변환하여 고유값 찾기 col_data = X[:, col] str_data = [str(x) for x in col_data.flatten()] unique_vals = list(set(str_data)) val_map = {val: i for i, val in enumerate(unique_vals)} for i in range(X.shape[0]): X_cleaned[i, col] = float(val_map.get(str(X[i, col]), 0)) # 변환 후 NaN과 무한값 처리 nan_mask = np.isnan(X_cleaned) inf_mask = np.isinf(X_cleaned) if verbose and (nan_mask.any() or inf_mask.any()): nan_count = nan_mask.sum() inf_count = inf_mask.sum() total = X_cleaned.size print(f"NaN: {nan_count}개, Inf: {inf_count}개 / 전체 {total}개 ({(nan_count+inf_count)/total*100:.2f}%)") # 이상값 대체 X_cleaned = np.nan_to_num(X_cleaned, nan=replace_nan, posinf=replace_inf, neginf=-replace_inf) # 원래 차원으로 복원 if was_1d: X_cleaned = X_cleaned.flatten() elif was_3d: X_cleaned = X_cleaned.reshape(original_shape) return X_cleaned def tanh_scale(X, replace_nan=0.0, replace_inf=0.0, verbose=False): """ Tanh 스케일링 적용: 데이터 정리 후 [-1, 1] 범위로 변환 """ # 통합된 데이터 정리 함수 사용 X_cleaned = clean_numeric_data(X, replace_nan, replace_inf, verbose) # 표준 스케일링 적용 scaler = StandardScaler() X_scaled = scaler.fit_transform(X_cleaned) # tanh 변환으로 [-1, 1] 범위로 매핑 return np.tanh(X_scaled), scaler def rescale_predictions(predictions, actual, eps=1e-8): """ 예측된 로그 수익률을 실제 로그 수익률 스케일로 조정 """ # 수치 안정성 보장 pred_mean = np.mean(predictions) pred_std = np.std(predictions) + eps act_mean = np.mean(actual) act_std = np.std(actual) + eps # 분포 매칭을 통한 재조정 return (predictions - pred_mean) / pred_std * act_std + act_mean def normalize_data(data): """ 종목별 개별 정규화 적용 (Tanh 스케일링) """ ticker_data = {} normalized_dfs = [] # 종목별 ID 인코딩 ticker_encoder = LabelEncoder() ticker_encoder.fit(data['ticker'].unique()) # 종목별로 데이터 처리 for ticker in data['ticker'].unique(): ticker_df = data[data['ticker'] == ticker].copy() # 결측치 제거 ticker_df = ticker_df.dropna() ticker_data[ticker] = {'original_df': ticker_df.copy()} ticker_df = ticker_df.sort_index() # 연속 거래일 사이의 실제 경과 일수 계산 (dt) ticker_df['days_diff'] = ticker_df.index.to_series().diff().dt.days.fillna(1.0) # 로그 수익률 계산 ticker_df['log_return'] = np.log(ticker_df['Close'] / ticker_df['Close'].shift(1)) ticker_df = ticker_df.dropna() # 특성과 레이블 분리 feature_cols = [col for col in ticker_df.columns if col not in ['ticker', 'Close', 'Return', 'log_return', 'days_diff']] # 통합된 Tanh 스케일링 적용 scaled_features, scaler = tanh_scale(ticker_df[feature_cols].values, verbose=False) # 스케일러 저장 ticker_data[ticker]['feature_scaler'] = scaler ticker_data[ticker]['feature_cols'] = feature_cols ticker_data[ticker]['scaling_method'] = 'tanh' # 정규화된 데이터프레임 생성 features_df = pd.DataFrame(scaled_features, columns=feature_cols, index=ticker_df.index) # 라벨과 메타데이터 추가 normalized_df = features_df.copy() normalized_df['log_return'] = ticker_df['log_return'] normalized_df['ticker'] = ticker normalized_df['ticker_id'] = ticker_encoder.transform([ticker])[0] normalized_df['Close'] = ticker_df['Close'] normalized_df['days_diff'] = ticker_df['days_diff'] normalized_dfs.append(normalized_df) # 모든 정규화 데이터 합치기 normalized_data = pd.concat(normalized_dfs) normalized_data.sort_index(inplace=True) return normalized_data, ticker_encoder, ticker_data