""" 저장된 TensorFlow Lite 모델과 설정을 사용하여 종목에 대한 다음 날 예측 수행 """ import os import sys import json import argparse # NOTE: tensorflow 는 반드시 pandas/numpy 보다 먼저 import 해야 한다. # pandas/numpy 가 OpenMP 런타임(libiomp5md.dll)을 먼저 올리면 tensorflow 네이티브 # DLL(_pywrap_tensorflow_internal) 초기화가 중복 충돌하여 WinError 1114 로 죽는다. import tensorflow as tf import pandas as pd import numpy as np from datetime import datetime, timedelta from pathlib import Path import subprocess import pickle import warnings # 경고 무시 warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow') # 모듈 경로 추가 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.optimization.utils import predict_with_tflite def get_project_root(): """프로젝트 루트 디렉토리를 반환합니다.""" return Path(__file__).parent.parent def check_and_data(tickers): """ 주식 데이터가 존재하는지 확인하고, 없으면 data.py 스크립트로 가져옵니다. """ ticker_list = tickers.split('_') if '_' in tickers else [tickers] data_dir = Path(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) / 'data' data_dir.mkdir(parents=True, exist_ok=True) missing_tickers = [ ticker for ticker in ticker_list if not (data_dir / f'{ticker}_data.csv').exists() ] if not missing_tickers: return True print(f"누락된 종목 데이터 가져오기: {', '.join(missing_tickers)}") missing_str = '_'.join(missing_tickers) try: script_path = Path(__file__).parent / 'data.py' result = subprocess.run( [sys.executable, str(script_path), '--tickers', missing_str], check=True, capture_output=True, text=True ) print(result.stdout) return True except subprocess.CalledProcessError as e: print(f"데이터 가져오기 실패: {e}") print(f"오류 출력: {e.stderr}") return False def load_model_and_config(model_path, config_path): """ TensorFlow Lite 모델, 설정, 인코더 정보를 로드합니다. """ try: # 설정 파일 로드 with open(config_path, 'r') as f: config = json.load(f) # 인코더 정보 로드 encoders = None model_path_obj = Path(model_path) encoder_path = model_path_obj.with_suffix('').with_name(f"{model_path_obj.stem}_encoders.json") if os.path.exists(encoder_path): with open(encoder_path, 'r') as f: encoders = json.load(f) # TensorFlow Lite 모델 로드 print(f"모델 로드 중: {Path(model_path).name}") interpreter = tf.lite.Interpreter(model_path=str(model_path)) interpreter.allocate_tensors() return interpreter, config, encoders except Exception as e: print(f"모델 로드 실패: {e}") return None, config, encoders def predict_next_day(model_path, config_path, ticker, output_file='next_day_prediction.csv'): """ 다음 날 주가 예측 함수 """ # 모델, 설정, 인코더 로드 model_or_interpreter, cfg, encoders = load_model_and_config(model_path, config_path) if model_or_interpreter is None or cfg is None: print("모델 로드에 실패했습니다.") return None threshold = cfg.get('best_threshold', cfg.get('threshold', 0.0)) # 파일 경로 설정 tickers_path = ticker.replace(',', '_') if ',' in ticker else ticker data_dir = get_project_root() / "data" processed_dir = data_dir / "processed" processed_path = processed_dir / f"{tickers_path}_processed.pkl" # 전처리된 파일이 없을 경우 data.py 실행 if not processed_path.exists(): try: script_path = Path(__file__).parent / 'data.py' result = subprocess.run( [sys.executable, str(script_path), '--tickers', ticker], check=True, capture_output=True, text=True ) except: return None # 전처리된 데이터 로드 try: print(f"{ticker} 데이터 로드 중...") with open(processed_path, 'rb') as f: processed_data = pickle.load(f) # 데이터 딕셔너리 추출 data_dict = processed_data[0] if isinstance(processed_data, tuple) else processed_data # 시퀀스 데이터 선택 (테스트 > 검증 > 훈련 순) if 'x_test' in data_dict and len(data_dict['x_test']) > 0: source = 'test' elif 'x_val' in data_dict and len(data_dict['x_val']) > 0: source = 'val' elif 'x_train' in data_dict and len(data_dict['x_train']) > 0: source = 'train' else: return None # 필요한 데이터 추출 last_sequence = data_dict[f'x_{source}'][-1:] last_ticker = data_dict[f'ticker_{source}'][-1] last_time_diff = data_dict[f'time_diffs_{source}'][-1:] # 섹터/산업 정보 (있으면 사용, 없으면 0으로 설정) sector_id = 0 industry_id = 0 if f'sector_{source}' in data_dict: sector_id = data_dict[f'sector_{source}'][-1] if f'industry_{source}' in data_dict: industry_id = data_dict[f'industry_{source}'][-1] print(f"예측 수행 중...") if hasattr(model_or_interpreter, 'predict'): inputs = [ tf.cast(last_sequence, tf.float32), tf.cast(np.array([last_ticker]), tf.int32), tf.cast(np.array([sector_id]), tf.int32), tf.cast(np.array([industry_id]), tf.int32), tf.cast(last_time_diff, tf.float32), ] y_pred_all = model_or_interpreter.predict(inputs, verbose=0) y_pred = y_pred_all[0] if isinstance(y_pred_all, list) else y_pred_all else: # TensorFlow Lite 인터프리터 # 입력 텐서 정보 가져오기 input_details = model_or_interpreter.get_input_details() # 필요한 경우 입력 데이터 재구성 inputs = [] for i, detail in enumerate(input_details): name = detail['name'].lower() if hasattr(detail['name'], 'lower') else "" if 'time' in name: inputs.append(last_time_diff.astype(np.float32)) elif 'ticker' in name: inputs.append(np.array([last_ticker], dtype=np.int32)) elif 'industry' in name: inputs.append(np.array([industry_id], dtype=np.int32)) elif 'sector' in name: inputs.append(np.array([sector_id], dtype=np.int32)) else: seq_data = last_sequence.astype(np.float32) if len(detail['shape']) == 4 and len(seq_data.shape) == 3: seq_data = np.expand_dims(seq_data, axis=-1) inputs.append(seq_data) if not inputs or len(inputs) != len(input_details): inputs = [ last_time_diff.astype(np.float32), last_sequence.astype(np.float32), np.array([last_ticker], dtype=np.int32), np.array([industry_id], dtype=np.int32), np.array([sector_id], dtype=np.int32) ] # TensorFlow Lite 모델로 예측 y_pred = predict_with_tflite(model_or_interpreter, inputs, verbose=False) if y_pred is None: return None # 예측값 추출 if isinstance(y_pred, list): value_output = y_pred[0] if len(value_output.shape) == 3: pred_value = float(value_output[0, -1, 0]) elif len(value_output.shape) == 2: pred_value = float(value_output[0, 0]) else: pred_value = float(value_output.flatten()[-1]) elif hasattr(y_pred, 'shape'): if len(y_pred.shape) == 3: pred_value = float(y_pred[0, -1, 0]) elif y_pred.shape == (1,): pred_value = float(y_pred[0]) elif y_pred.shape == (1, 1): pred_value = float(y_pred[0, 0]) else: pred_value = float(y_pred.flatten()[-1]) else: pred_value = float(y_pred) signal = 'BUY' if pred_value > threshold else 'SELL' if pred_value < -threshold else 'HOLD' confidence = abs(pred_value) # 결과 출력 print(f"\n===== {ticker} 다음 날 예측 =====") print(f"예측값: {pred_value:.6f}") print(f"임계값: {threshold:.6f}") print(f"신호: {signal}") print(f"신뢰도: {confidence:.6f}") # 결과를 CSV로 저장 results_df = pd.DataFrame({ 'ticker': [ticker], 'prediction_date': [datetime.now().strftime('%Y-%m-%d')], 'predicted_value': [pred_value], 'threshold': [threshold], 'signal': [signal], 'confidence': [confidence] }) # 모델 디렉토리에 저장 models_dir = get_project_root() / "models" models_dir.mkdir(exist_ok=True) output_path = models_dir / output_file results_df.to_csv(output_path, index=False) print(f"결과 저장: {output_path}") return results_df except Exception as e: return None def main(): parser = argparse.ArgumentParser(description="저장된 TensorFlow Lite 모델로 다음 날 주가 예측") parser.add_argument('--model', type=str, default='models/best_contime_grid_search.tflite', help='저장된 TensorFlow Lite 모델 경로') parser.add_argument('--config', type=str, default='models/results/best_contime_grid_search_meta.json', help='저장된 설정 파일 경로') parser.add_argument('--tickers', type=str, required=True, help='예측할 종목 (단일 종목)') parser.add_argument('--output', type=str, default='predictions.csv', help='예측 결과 저장 경로') args = parser.parse_args() # 데이터 파일이 있는지 확인하고, 없으면 데이터 가져오기 if not check_and_data(args.tickers): print("데이터 준비에 실패했습니다.") return # 다중 종목 처리 방지 if '_' in args.tickers: print("단일 종목만 예측 가능합니다. 여러 종목은 개별적으로 실행해주세요.") return # 다음날 예측 수행 predict_next_day( model_path=args.model, config_path=args.config, ticker=args.tickers, output_file=args.output ) if __name__ == "__main__": main() # ----- 포트폴리오 성능 ----- # 테스트 세트 총 수익률: 0.2360 # 테스트 세트 샤프 비율: 0.0472 # 테스트 세트 최대 낙폭: -0.1322 # 테스트 세트 거래 수: 54 # ----- 개별 종목 평균 성능 ----- # 테스트 세트 평균 종목 수익률: 0.2360 # 테스트 세트 평균 종목 샤프 비율: 0.0457