Spaces:
Sleeping
Sleeping
| """ | |
| ๋ฐ์ดํฐ ์ ๊ทํ ์ ํธ๋ฆฌํฐ | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.preprocessing import StandardScaler, LabelEncoder | |
| def clean_numeric_data(X, replace_nan=0.0, replace_inf=0.0, verbose=False): | |
| """ | |
| ์ ๋ ฅ ๋ฐ์ดํฐ๋ฅผ ์์นํ์ผ๋ก ๋ณํํ๊ณ ์ด์๊ฐ ์ฒ๋ฆฌ | |
| """ | |
| if X is None or X.size == 0: | |
| return X | |
| # numpy ๋ฐฐ์ด๋ก ๋ณํ | |
| X = np.asarray(X) | |
| # ์ ๋ ฅ์ด 1์ฐจ์์ธ ๊ฒฝ์ฐ 2์ฐจ์์ผ๋ก ๋ณํ | |
| if X.ndim == 1: | |
| X = X.reshape(-1, 1) | |
| was_1d = True | |
| was_3d = False | |
| original_shape = None | |
| elif X.ndim == 3: | |
| # 3์ฐจ์ ๋ฐฐ์ด์ธ ๊ฒฝ์ฐ 2์ฐจ์์ผ๋ก reshape | |
| original_shape = X.shape | |
| X = X.reshape(X.shape[0], -1) | |
| was_3d = True | |
| was_1d = False | |
| else: | |
| was_1d = False | |
| was_3d = False | |
| original_shape = None | |
| if np.issubdtype(X.dtype, np.number): | |
| # ์์นํ ๋ฐ์ดํฐ์ ๊ฒฝ์ฐ ๋ฐ๋ก ์ด์๊ฐ ์ฒ๋ฆฌ | |
| X_cleaned = X.astype(np.float32) | |
| # NaN๊ณผ ๋ฌดํ๊ฐ ์ฒ๋ฆฌ | |
| nan_mask = np.isnan(X_cleaned) | |
| inf_mask = np.isinf(X_cleaned) | |
| if verbose and (nan_mask.any() or inf_mask.any()): | |
| nan_count = nan_mask.sum() | |
| inf_count = inf_mask.sum() | |
| total = X_cleaned.size | |
| print(f"NaN: {nan_count}๊ฐ, Inf: {inf_count}๊ฐ / ์ ์ฒด {total}๊ฐ ({(nan_count+inf_count)/total*100:.2f}%)") | |
| # ์ด์๊ฐ ๋์ฒด | |
| X_cleaned = np.nan_to_num(X_cleaned, nan=replace_nan, posinf=replace_inf, neginf=-replace_inf) | |
| else: | |
| # ๋น์์นํ ๋ฐ์ดํฐ ์ฒ๋ฆฌ | |
| X_cleaned = np.zeros((X.shape[0], X.shape[1]), dtype=np.float32) | |
| for col in range(X.shape[1]): | |
| try: | |
| # ์์นํ์ผ๋ก ๋ณํ ์๋ | |
| col_data = X[:, col] | |
| X_cleaned[:, col] = col_data.astype(np.float32) | |
| except (ValueError, TypeError): | |
| # ๋ฌธ์์ด ๊ฐ์ ์ซ์๋ก ์ธ์ฝ๋ฉ | |
| if verbose: | |
| print(f"๊ฒฝ๊ณ : ์ด {col}์ ๋น์์น ๋ฐ์ดํฐ๊ฐ ํฌํจ๋์ด ์์ด ์ธ์ฝ๋ฉํฉ๋๋ค.") | |
| # ๊ฐ ์์๋ฅผ ๋ฌธ์์ด๋ก ๋ณํํ์ฌ ๊ณ ์ ๊ฐ ์ฐพ๊ธฐ | |
| col_data = X[:, col] | |
| str_data = [str(x) for x in col_data.flatten()] | |
| unique_vals = list(set(str_data)) | |
| val_map = {val: i for i, val in enumerate(unique_vals)} | |
| for i in range(X.shape[0]): | |
| X_cleaned[i, col] = float(val_map.get(str(X[i, col]), 0)) | |
| # ๋ณํ ํ NaN๊ณผ ๋ฌดํ๊ฐ ์ฒ๋ฆฌ | |
| nan_mask = np.isnan(X_cleaned) | |
| inf_mask = np.isinf(X_cleaned) | |
| if verbose and (nan_mask.any() or inf_mask.any()): | |
| nan_count = nan_mask.sum() | |
| inf_count = inf_mask.sum() | |
| total = X_cleaned.size | |
| print(f"NaN: {nan_count}๊ฐ, Inf: {inf_count}๊ฐ / ์ ์ฒด {total}๊ฐ ({(nan_count+inf_count)/total*100:.2f}%)") | |
| # ์ด์๊ฐ ๋์ฒด | |
| X_cleaned = np.nan_to_num(X_cleaned, nan=replace_nan, posinf=replace_inf, neginf=-replace_inf) | |
| # ์๋ ์ฐจ์์ผ๋ก ๋ณต์ | |
| if was_1d: | |
| X_cleaned = X_cleaned.flatten() | |
| elif was_3d: | |
| X_cleaned = X_cleaned.reshape(original_shape) | |
| return X_cleaned | |
| def tanh_scale(X, replace_nan=0.0, replace_inf=0.0, verbose=False): | |
| """ | |
| Tanh ์ค์ผ์ผ๋ง ์ ์ฉ: ๋ฐ์ดํฐ ์ ๋ฆฌ ํ [-1, 1] ๋ฒ์๋ก ๋ณํ | |
| """ | |
| # ํตํฉ๋ ๋ฐ์ดํฐ ์ ๋ฆฌ ํจ์ ์ฌ์ฉ | |
| X_cleaned = clean_numeric_data(X, replace_nan, replace_inf, verbose) | |
| # ํ์ค ์ค์ผ์ผ๋ง ์ ์ฉ | |
| scaler = StandardScaler() | |
| X_scaled = scaler.fit_transform(X_cleaned) | |
| # tanh ๋ณํ์ผ๋ก [-1, 1] ๋ฒ์๋ก ๋งคํ | |
| return np.tanh(X_scaled), scaler | |
| def rescale_predictions(predictions, actual, eps=1e-8): | |
| """ | |
| ์์ธก๋ ๋ก๊ทธ ์์ต๋ฅ ์ ์ค์ ๋ก๊ทธ ์์ต๋ฅ ์ค์ผ์ผ๋ก ์กฐ์ | |
| """ | |
| # ์์น ์์ ์ฑ ๋ณด์ฅ | |
| pred_mean = np.mean(predictions) | |
| pred_std = np.std(predictions) + eps | |
| act_mean = np.mean(actual) | |
| act_std = np.std(actual) + eps | |
| # ๋ถํฌ ๋งค์นญ์ ํตํ ์ฌ์กฐ์ | |
| return (predictions - pred_mean) / pred_std * act_std + act_mean | |
| def normalize_data(data): | |
| """ | |
| ์ข ๋ชฉ๋ณ ๊ฐ๋ณ ์ ๊ทํ ์ ์ฉ (Tanh ์ค์ผ์ผ๋ง) | |
| """ | |
| ticker_data = {} | |
| normalized_dfs = [] | |
| # ์ข ๋ชฉ๋ณ ID ์ธ์ฝ๋ฉ | |
| ticker_encoder = LabelEncoder() | |
| ticker_encoder.fit(data['ticker'].unique()) | |
| # ์ข ๋ชฉ๋ณ๋ก ๋ฐ์ดํฐ ์ฒ๋ฆฌ | |
| for ticker in data['ticker'].unique(): | |
| ticker_df = data[data['ticker'] == ticker].copy() | |
| # ๊ฒฐ์ธก์น ์ ๊ฑฐ | |
| ticker_df = ticker_df.dropna() | |
| ticker_data[ticker] = {'original_df': ticker_df.copy()} | |
| ticker_df = ticker_df.sort_index() | |
| # ์ฐ์ ๊ฑฐ๋์ผ ์ฌ์ด์ ์ค์ ๊ฒฝ๊ณผ ์ผ์ ๊ณ์ฐ (dt) | |
| ticker_df['days_diff'] = ticker_df.index.to_series().diff().dt.days.fillna(1.0) | |
| # ๋ก๊ทธ ์์ต๋ฅ ๊ณ์ฐ | |
| ticker_df['log_return'] = np.log(ticker_df['Close'] / ticker_df['Close'].shift(1)) | |
| ticker_df = ticker_df.dropna() | |
| # ํน์ฑ๊ณผ ๋ ์ด๋ธ ๋ถ๋ฆฌ | |
| feature_cols = [col for col in ticker_df.columns | |
| if col not in ['ticker', 'Close', 'Return', 'log_return', 'days_diff']] | |
| # ํตํฉ๋ Tanh ์ค์ผ์ผ๋ง ์ ์ฉ | |
| scaled_features, scaler = tanh_scale(ticker_df[feature_cols].values, verbose=False) | |
| # ์ค์ผ์ผ๋ฌ ์ ์ฅ | |
| ticker_data[ticker]['feature_scaler'] = scaler | |
| ticker_data[ticker]['feature_cols'] = feature_cols | |
| ticker_data[ticker]['scaling_method'] = 'tanh' | |
| # ์ ๊ทํ๋ ๋ฐ์ดํฐํ๋ ์ ์์ฑ | |
| features_df = pd.DataFrame(scaled_features, columns=feature_cols, index=ticker_df.index) | |
| # ๋ผ๋ฒจ๊ณผ ๋ฉํ๋ฐ์ดํฐ ์ถ๊ฐ | |
| normalized_df = features_df.copy() | |
| normalized_df['log_return'] = ticker_df['log_return'] | |
| normalized_df['ticker'] = ticker | |
| normalized_df['ticker_id'] = ticker_encoder.transform([ticker])[0] | |
| normalized_df['Close'] = ticker_df['Close'] | |
| normalized_df['days_diff'] = ticker_df['days_diff'] | |
| normalized_dfs.append(normalized_df) | |
| # ๋ชจ๋ ์ ๊ทํ ๋ฐ์ดํฐ ํฉ์น๊ธฐ | |
| normalized_data = pd.concat(normalized_dfs) | |
| normalized_data.sort_index(inplace=True) | |
| return normalized_data, ticker_encoder, ticker_data |