ostock-backend / model /src /data /normalize.py
johnaness's picture
Deploy OStock FastAPI backend to HF Space (Docker SDK, port 7860)
4be2d4d
"""
๋ฐ์ดํ„ฐ ์ •๊ทœํ™” ์œ ํ‹ธ๋ฆฌํ‹ฐ
"""
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoder
def clean_numeric_data(X, replace_nan=0.0, replace_inf=0.0, verbose=False):
"""
์ž…๋ ฅ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์น˜ํ˜•์œผ๋กœ ๋ณ€ํ™˜ํ•˜๊ณ  ์ด์ƒ๊ฐ’ ์ฒ˜๋ฆฌ
"""
if X is None or X.size == 0:
return X
# numpy ๋ฐฐ์—ด๋กœ ๋ณ€ํ™˜
X = np.asarray(X)
# ์ž…๋ ฅ์ด 1์ฐจ์›์ธ ๊ฒฝ์šฐ 2์ฐจ์›์œผ๋กœ ๋ณ€ํ™˜
if X.ndim == 1:
X = X.reshape(-1, 1)
was_1d = True
was_3d = False
original_shape = None
elif X.ndim == 3:
# 3์ฐจ์› ๋ฐฐ์—ด์ธ ๊ฒฝ์šฐ 2์ฐจ์›์œผ๋กœ reshape
original_shape = X.shape
X = X.reshape(X.shape[0], -1)
was_3d = True
was_1d = False
else:
was_1d = False
was_3d = False
original_shape = None
if np.issubdtype(X.dtype, np.number):
# ์ˆ˜์น˜ํ˜• ๋ฐ์ดํ„ฐ์˜ ๊ฒฝ์šฐ ๋ฐ”๋กœ ์ด์ƒ๊ฐ’ ์ฒ˜๋ฆฌ
X_cleaned = X.astype(np.float32)
# NaN๊ณผ ๋ฌดํ•œ๊ฐ’ ์ฒ˜๋ฆฌ
nan_mask = np.isnan(X_cleaned)
inf_mask = np.isinf(X_cleaned)
if verbose and (nan_mask.any() or inf_mask.any()):
nan_count = nan_mask.sum()
inf_count = inf_mask.sum()
total = X_cleaned.size
print(f"NaN: {nan_count}๊ฐœ, Inf: {inf_count}๊ฐœ / ์ „์ฒด {total}๊ฐœ ({(nan_count+inf_count)/total*100:.2f}%)")
# ์ด์ƒ๊ฐ’ ๋Œ€์ฒด
X_cleaned = np.nan_to_num(X_cleaned, nan=replace_nan, posinf=replace_inf, neginf=-replace_inf)
else:
# ๋น„์ˆ˜์น˜ํ˜• ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ
X_cleaned = np.zeros((X.shape[0], X.shape[1]), dtype=np.float32)
for col in range(X.shape[1]):
try:
# ์ˆ˜์น˜ํ˜•์œผ๋กœ ๋ณ€ํ™˜ ์‹œ๋„
col_data = X[:, col]
X_cleaned[:, col] = col_data.astype(np.float32)
except (ValueError, TypeError):
# ๋ฌธ์ž์—ด ๊ฐ’์„ ์ˆซ์ž๋กœ ์ธ์ฝ”๋”ฉ
if verbose:
print(f"๊ฒฝ๊ณ : ์—ด {col}์— ๋น„์ˆ˜์น˜ ๋ฐ์ดํ„ฐ๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ์–ด ์ธ์ฝ”๋”ฉํ•ฉ๋‹ˆ๋‹ค.")
# ๊ฐ ์›์†Œ๋ฅผ ๋ฌธ์ž์—ด๋กœ ๋ณ€ํ™˜ํ•˜์—ฌ ๊ณ ์œ ๊ฐ’ ์ฐพ๊ธฐ
col_data = X[:, col]
str_data = [str(x) for x in col_data.flatten()]
unique_vals = list(set(str_data))
val_map = {val: i for i, val in enumerate(unique_vals)}
for i in range(X.shape[0]):
X_cleaned[i, col] = float(val_map.get(str(X[i, col]), 0))
# ๋ณ€ํ™˜ ํ›„ NaN๊ณผ ๋ฌดํ•œ๊ฐ’ ์ฒ˜๋ฆฌ
nan_mask = np.isnan(X_cleaned)
inf_mask = np.isinf(X_cleaned)
if verbose and (nan_mask.any() or inf_mask.any()):
nan_count = nan_mask.sum()
inf_count = inf_mask.sum()
total = X_cleaned.size
print(f"NaN: {nan_count}๊ฐœ, Inf: {inf_count}๊ฐœ / ์ „์ฒด {total}๊ฐœ ({(nan_count+inf_count)/total*100:.2f}%)")
# ์ด์ƒ๊ฐ’ ๋Œ€์ฒด
X_cleaned = np.nan_to_num(X_cleaned, nan=replace_nan, posinf=replace_inf, neginf=-replace_inf)
# ์›๋ž˜ ์ฐจ์›์œผ๋กœ ๋ณต์›
if was_1d:
X_cleaned = X_cleaned.flatten()
elif was_3d:
X_cleaned = X_cleaned.reshape(original_shape)
return X_cleaned
def tanh_scale(X, replace_nan=0.0, replace_inf=0.0, verbose=False):
"""
Tanh ์Šค์ผ€์ผ๋ง ์ ์šฉ: ๋ฐ์ดํ„ฐ ์ •๋ฆฌ ํ›„ [-1, 1] ๋ฒ”์œ„๋กœ ๋ณ€ํ™˜
"""
# ํ†ตํ•ฉ๋œ ๋ฐ์ดํ„ฐ ์ •๋ฆฌ ํ•จ์ˆ˜ ์‚ฌ์šฉ
X_cleaned = clean_numeric_data(X, replace_nan, replace_inf, verbose)
# ํ‘œ์ค€ ์Šค์ผ€์ผ๋ง ์ ์šฉ
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_cleaned)
# tanh ๋ณ€ํ™˜์œผ๋กœ [-1, 1] ๋ฒ”์œ„๋กœ ๋งคํ•‘
return np.tanh(X_scaled), scaler
def rescale_predictions(predictions, actual, eps=1e-8):
"""
์˜ˆ์ธก๋œ ๋กœ๊ทธ ์ˆ˜์ต๋ฅ ์„ ์‹ค์ œ ๋กœ๊ทธ ์ˆ˜์ต๋ฅ  ์Šค์ผ€์ผ๋กœ ์กฐ์ •
"""
# ์ˆ˜์น˜ ์•ˆ์ •์„ฑ ๋ณด์žฅ
pred_mean = np.mean(predictions)
pred_std = np.std(predictions) + eps
act_mean = np.mean(actual)
act_std = np.std(actual) + eps
# ๋ถ„ํฌ ๋งค์นญ์„ ํ†ตํ•œ ์žฌ์กฐ์ •
return (predictions - pred_mean) / pred_std * act_std + act_mean
def normalize_data(data):
"""
์ข…๋ชฉ๋ณ„ ๊ฐœ๋ณ„ ์ •๊ทœํ™” ์ ์šฉ (Tanh ์Šค์ผ€์ผ๋ง)
"""
ticker_data = {}
normalized_dfs = []
# ์ข…๋ชฉ๋ณ„ ID ์ธ์ฝ”๋”ฉ
ticker_encoder = LabelEncoder()
ticker_encoder.fit(data['ticker'].unique())
# ์ข…๋ชฉ๋ณ„๋กœ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ
for ticker in data['ticker'].unique():
ticker_df = data[data['ticker'] == ticker].copy()
# ๊ฒฐ์ธก์น˜ ์ œ๊ฑฐ
ticker_df = ticker_df.dropna()
ticker_data[ticker] = {'original_df': ticker_df.copy()}
ticker_df = ticker_df.sort_index()
# ์—ฐ์† ๊ฑฐ๋ž˜์ผ ์‚ฌ์ด์˜ ์‹ค์ œ ๊ฒฝ๊ณผ ์ผ์ˆ˜ ๊ณ„์‚ฐ (dt)
ticker_df['days_diff'] = ticker_df.index.to_series().diff().dt.days.fillna(1.0)
# ๋กœ๊ทธ ์ˆ˜์ต๋ฅ  ๊ณ„์‚ฐ
ticker_df['log_return'] = np.log(ticker_df['Close'] / ticker_df['Close'].shift(1))
ticker_df = ticker_df.dropna()
# ํŠน์„ฑ๊ณผ ๋ ˆ์ด๋ธ” ๋ถ„๋ฆฌ
feature_cols = [col for col in ticker_df.columns
if col not in ['ticker', 'Close', 'Return', 'log_return', 'days_diff']]
# ํ†ตํ•ฉ๋œ Tanh ์Šค์ผ€์ผ๋ง ์ ์šฉ
scaled_features, scaler = tanh_scale(ticker_df[feature_cols].values, verbose=False)
# ์Šค์ผ€์ผ๋Ÿฌ ์ €์žฅ
ticker_data[ticker]['feature_scaler'] = scaler
ticker_data[ticker]['feature_cols'] = feature_cols
ticker_data[ticker]['scaling_method'] = 'tanh'
# ์ •๊ทœํ™”๋œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ์ƒ์„ฑ
features_df = pd.DataFrame(scaled_features, columns=feature_cols, index=ticker_df.index)
# ๋ผ๋ฒจ๊ณผ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ถ”๊ฐ€
normalized_df = features_df.copy()
normalized_df['log_return'] = ticker_df['log_return']
normalized_df['ticker'] = ticker
normalized_df['ticker_id'] = ticker_encoder.transform([ticker])[0]
normalized_df['Close'] = ticker_df['Close']
normalized_df['days_diff'] = ticker_df['days_diff']
normalized_dfs.append(normalized_df)
# ๋ชจ๋“  ์ •๊ทœํ™” ๋ฐ์ดํ„ฐ ํ•ฉ์น˜๊ธฐ
normalized_data = pd.concat(normalized_dfs)
normalized_data.sort_index(inplace=True)
return normalized_data, ticker_encoder, ticker_data