""" 데이터 통합 및 전처리 모듈 """ import pandas as pd import yfinance as yf from .technical_indicators import add_technical_indicators from .optimization import run_technical_optimization from .financial_data import process_financial_data from .economic_data import get_economic_data from .hierarchical_embedding import get_industry_data, add_industry_encoding, combine_stocks_for_embedding from src.config import FRED_API_KEY def process_stock_data(tickers, start_date, end_date, fred_api_key=None): """ 주식 데이터 처리의 전체 파이프라인을 실행하는 함수 """ # API 키 기본값 처리 if fred_api_key is None: fred_api_key = FRED_API_KEY print(f"주식 데이터 처리 시작: {len(tickers)}개 종목") # 1. 기술적 지표 최적화 optimal_params = run_technical_optimization(tickers, start_date, end_date) # 2. 최적화된 파라미터로 데이터셋 생성 all_data = {} for ticker in tickers: df = yf.download(ticker, start=start_date, end=end_date, auto_adjust=True) if isinstance(df.columns, pd.MultiIndex): df.columns = df.columns.droplevel(1) # 최적화된 파라미터로 기술적 지표 추가 df_with_indicators = add_technical_indicators( df.copy(), ema_params=optimal_params['ema'], macd_params=optimal_params['macd'], cmf_period=optimal_params['cmf'], rsi_params=optimal_params['rsi'] ) # 결측치 제거 df_with_indicators = df_with_indicators.dropna() all_data[ticker] = df_with_indicators print(f"{ticker} 데이터 처리 완료: {len(df_with_indicators)}행") # 3. 재무제표 데이터 처리 및 통합 print("\n===== 재무제표 데이터 처리 시작 =====\n") financial_data_all = {} common_features = None # 모든 종목의 재무 데이터 수집 for ticker in tickers: if ticker not in all_data: continue financial_data = process_financial_data(ticker, all_data, end_date) if financial_data is not None and not financial_data.empty: financial_features = [col for col in financial_data.columns if col != 'Close'] if financial_features: financial_data_all[ticker] = financial_data # 공통 특성 추적 if common_features is None: common_features = set(financial_features) else: common_features = common_features.intersection(set(financial_features)) # 4. 경제 지표 데이터 추가 print("\n===== 경제 지표 데이터 처리 시작 =====\n") econ_df = get_economic_data(start_date, end_date, fred_api_key) # 5. 데이터 최종 통합 (기술 지표 + 재무제표 + 경제 지표) print("\n===== 최종 데이터 통합 =====\n") # 공통 재무 특성 선택 if common_features: selected_common_features = list(common_features)[:20] # 상위 20개 print(f"모든 종목에 공통인 재무 특성 중 {len(selected_common_features)}개 선택") else: selected_common_features = [] print("공통 재무 특성이 없습니다.") # 각 종목별 데이터 통합 for ticker in tickers: if ticker in all_data: try: # 기본 데이터 (기술적 지표 포함) stock_data = all_data[ticker].copy() # 재무제표 데이터 추가 (선택된 공통 특성만) if ticker in financial_data_all and selected_common_features: fin_data = financial_data_all[ticker][selected_common_features] stock_data = stock_data.join(fin_data, how='left') # 경제 지표 데이터 추가 stock_data = stock_data.join(econ_df, how='left') # 결측치 처리 for col in stock_data.columns: if col != 'Close' and stock_data[col].isna().any(): stock_data[col] = stock_data[col].interpolate(method='linear') stock_data = stock_data.dropna() # 최종 데이터 저장 all_data[ticker] = stock_data print(f"{ticker} 데이터 통합 완료: {stock_data.shape[1]}개 특성") except Exception as e: print(f"{ticker} 데이터 통합 실패: {e}") # 6. 산업 정보 가져오기 및 데이터 통합 industry_df = get_industry_data(tickers) # 7. 종목 임베딩을 위한 데이터 통합 combined_data = combine_stocks_for_embedding(all_data, tickers) # 8. 산업 정보 인코딩 추가 final_data, industry_encoders = add_industry_encoding(combined_data, industry_df) return final_data, all_data, industry_encoders