""" 나스닥 & 뉴욕증권거래소 주식 데이터 수집 및 허깅페이스 데이터셋 생성 - 수집: 나스닥/뉴욕 전체 티커를 야후 파이낸스로 일별 데이터 조회 (전체기간) - 데이터셋 생성: all 데이터셋 + 최근 30일 데이터셋 자동 생성 """ import gradio as gr import yfinance as yf import pandas as pd from datetime import datetime, timedelta from zoneinfo import ZoneInfo from datasets import Dataset, load_dataset from huggingface_hub import HfApi import os import time import logging import json import traceback import gc import tempfile import uuid from urllib.request import Request, urlopen # 로깅 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) logging.getLogger("yfinance").setLevel(logging.ERROR) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) # 허깅페이스 토큰 (Spaces 시크릿에서 가져옴) HF_TOKEN = os.environ.get("HF_TOKEN", "") def get_ny_today_str(): """뉴욕 현지 날짜(YYYY-MM-DD) 반환""" ny_tz = ZoneInfo("America/New_York") return datetime.now(ny_tz).strftime("%Y-%m-%d") def is_us_market_open_now(): """미국 정규장(뉴욕시간 09:30~16:00) 장중 여부 반환""" ny_tz = ZoneInfo("America/New_York") now_ny = datetime.now(ny_tz) # 월(0)~금(4)만 정규장 if now_ny.weekday() >= 5: return False, now_ny minutes = now_ny.hour * 60 + now_ny.minute market_open = 9 * 60 + 30 market_close = 16 * 60 return market_open <= minutes < market_close, now_ny def fetch_tradingview_realtime(tickers, batch_size=400): """TradingView Screener API로 티커별 오늘 OHLCV 조회""" if not tickers: return [] results = {} today_str = get_ny_today_str() for i in range(0, len(tickers), batch_size): batch = tickers[i:i + batch_size] payload = { "symbols": { "tickers": [ *[f"NASDAQ:{t}" for t in batch], *[f"NYSE:{t}" for t in batch], *[f"AMEX:{t}" for t in batch], ] }, "columns": ["close", "open", "high", "low", "volume", "exchange"], "options": {"lang": "en"}, "markets": ["america"], "range": [0, max(50, len(batch) * 3)] } req = Request( "https://scanner.tradingview.com/america/scan", data=json.dumps(payload).encode("utf-8"), headers={ "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" }, method="POST" ) with urlopen(req, timeout=30) as resp: body = resp.read().decode("utf-8") parsed = json.loads(body) for item in parsed.get("data", []): symbol = item.get("s", "") if ":" not in symbol: continue _, ticker = symbol.split(":", 1) if ticker in results: continue data = item.get("d", []) if len(data) < 5: continue close_val, open_val, high_val, low_val, volume_val = data[:5] if close_val is None: continue results[ticker] = { "Ticker": ticker, "Date": today_str, "Open": round(float(open_val), 4) if open_val is not None else None, "High": round(float(high_val), 4) if high_val is not None else None, "Low": round(float(low_val), 4) if low_val is not None else None, "Close": round(float(close_val), 4) if close_val is not None else None, "Volume": int(volume_val) if volume_val is not None else None } time.sleep(0.2) return list(results.values()) def load_hf_dataset_as_df(repo_name, hf_token): """HF Hub 데이터셋을 pandas DataFrame으로 로드""" ds = load_dataset(repo_name, split="train", token=hf_token) df = ds.to_pandas() # 컬럼 표준화 required_cols = ["Ticker", "Date", "Open", "High", "Low", "Close", "Volume"] for col in required_cols: if col not in df.columns: df[col] = None df = df[required_cols] df["Ticker"] = df["Ticker"].astype(str).str.upper() df["Date"] = df["Date"].astype(str) return df def run_realtime_update( hf_token, all_dataset_name, recent_dataset_name, progress=gr.Progress() ): """ 실시간(장중) 데이터 업데이트 - 장중 여부 메시지 출력(썸머타임 자동 반영) - TradingView Screener로 데이터셋 티커 일괄 조회 - all: 오늘 데이터 추가(append-only) - 30d: 오래된 데이터 제거 후 오늘 데이터 반영 """ if not hf_token: return "❌ 허깅페이스 토큰이 필요합니다. HF_TOKEN 환경변수 또는 입력창에 토큰을 넣어주세요." logs = [] logs.append("=" * 60) logs.append("⚡ 실시간 데이터 업데이트 시작") logs.append(f"⏰ 시작 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logs.append("=" * 60) # 0) 장중 상태 확인 (DST 자동 반영) progress(0.05, desc="장중 여부 확인 중...") is_open, now_ny = is_us_market_open_now() if is_open: logs.append(f"🟢 장중입니다. (뉴욕시간 {now_ny.strftime('%Y-%m-%d %H:%M:%S')})") else: logs.append(f"🟡 장중이 아닙니다. (뉴욕시간 {now_ny.strftime('%Y-%m-%d %H:%M:%S')})") today_str = get_ny_today_str() # 1) 데이터셋 로드 progress(0.15, desc="기존 데이터셋 로드 중...") logs.append("\n📥 [1단계] 기존 데이터셋 로드 중...") all_df = load_hf_dataset_as_df(all_dataset_name, hf_token) recent_df = load_hf_dataset_as_df(recent_dataset_name, hf_token) # 조회 티커는 데이터셋에 있는 티커 기준 tickers = sorted(recent_df["Ticker"].dropna().astype(str).str.upper().unique().tolist()) if not tickers: tickers = sorted(all_df["Ticker"].dropna().astype(str).str.upper().unique().tolist()) if not tickers: return "\n".join(logs) + "\n\n❌ 데이터셋에 티커가 없어 실시간 조회를 진행할 수 없습니다." logs.append(f" - 조회 대상 티커 수: {len(tickers)}") # 2) 오늘 데이터 이미 있는지 확인 progress(0.25, desc="오늘 데이터 존재 여부 확인 중...") today_rows = all_df[all_df["Date"] == today_str] today_tickers = set(today_rows["Ticker"].astype(str).str.upper().tolist()) if set(tickers).issubset(today_tickers): logs.append("\n✅ 이미 수집했습니다") return "\n".join(logs) # 3) TradingView 실시간 조회 progress(0.45, desc="TradingView 실시간 조회 중...") logs.append("\n📡 [2단계] TradingView Screener 조회 중...") realtime_rows = fetch_tradingview_realtime(tickers) if not realtime_rows: return "\n".join(logs) + "\n\n❌ TradingView에서 실시간 데이터를 가져오지 못했습니다." realtime_df = pd.DataFrame(realtime_rows) realtime_df["Ticker"] = realtime_df["Ticker"].astype(str).str.upper() realtime_df["Date"] = realtime_df["Date"].astype(str) logs.append(f" - 수신 성공 티커 수: {realtime_df['Ticker'].nunique()}") # 4) all 데이터셋 업데이트 (추가만) progress(0.65, desc="all 데이터셋 업데이트 중...") logs.append("\n🧩 [3단계] all 데이터셋 업데이트(추가만)...") existing_today = set(all_df.loc[all_df["Date"] == today_str, "Ticker"].astype(str).str.upper().tolist()) add_all_df = realtime_df[~realtime_df["Ticker"].isin(existing_today)].copy() if not add_all_df.empty: all_updated_df = pd.concat([all_df, add_all_df], ignore_index=True) else: all_updated_df = all_df logs.append(f" - all 추가 건수: {len(add_all_df)}") # 5) 30d 데이터셋 업데이트 (오래된 날짜 제거 + 추가/갱신) progress(0.78, desc="30d 데이터셋 업데이트 중...") logs.append("\n🗓️ [4단계] 30d 데이터셋 업데이트(오래된 데이터 제거 + 추가)...") # 같은 티커/오늘 날짜가 기존에 있으면 교체를 위해 제거 update_tickers = set(realtime_df["Ticker"].tolist()) recent_df_wo_today = recent_df[ ~((recent_df["Date"] == today_str) & (recent_df["Ticker"].isin(update_tickers))) ].copy() recent_merged = pd.concat([recent_df_wo_today, realtime_df], ignore_index=True) recent_updated_df = filter_last_30_days(recent_merged) # 6) 업로드 progress(0.9, desc="허깅페이스 업로드 중...") logs.append("\n🚀 [5단계] 허깅페이스 업로드 중...") result_all = upload_dataset_to_hf(all_updated_df, all_dataset_name, hf_token) result_30d = upload_dataset_to_hf(recent_updated_df, recent_dataset_name, hf_token) logs.append(f" {result_all}") logs.append(f" {result_30d}") progress(1.0, desc="완료!") logs.append("\n" + "=" * 60) logs.append("✅ 실시간 데이터 업데이트 완료") logs.append(f"📅 반영 날짜(뉴욕 기준): {today_str}") logs.append("=" * 60) return "\n".join(logs) def get_all_us_tickers(): """ 야후 파이낸스 스크리너(yf.screen)를 사용하여 나스닥 + 뉴욕증권거래소 티커 목록을 가져옴. - 나스닥은 3개 마켓으로 구성: NMS(글로벌셀렉트), NGM(글로벌마켓), NCM(캐피털마켓) - 뉴욕증권거래소: NYQ - yfinance 내장 기능이라 별도 데이터 소스 불필요, HF Spaces에서도 동작 반환: (nasdaq_tickers, nyse_tickers, all_tickers) """ def _fetch_exchange_tickers(exchange_code): """야후 스크리너에서 특정 거래소의 전체 티커를 페이징으로 가져오기""" query = yf.EquityQuery("eq", ["exchange", exchange_code]) symbols = [] offset = 0 while True: result = yf.screen(query, size=250, offset=offset) quotes = result.get("quotes", []) if not quotes: break for quote in quotes: sym = quote.get("symbol", "") if sym: # [필터] # 1. '-', '.', '$'가 포함된 티커 (우선주, 유닛 등) 제외 # 2. 티커가 5자이면서 마지막이 W(Warrant), R(Right), U(Unit)인 파생 종목 제외 is_special = any(c in sym for c in ["-", ".", "$"]) is_derivative = len(sym) == 5 and sym[-1] in ["W", "R", "U"] if not (is_special or is_derivative): symbols.append(sym) offset += len(quotes) total = result.get("total", 0) if offset >= total: break return sorted(list(set(symbols))) try: # 나스닥: 3개 마켓 합산 # NMS = NASDAQ Global Select Market # NGM = NASDAQ Global Market # NCM = NASDAQ Capital Market nasdaq_tickers = [] for market_code in ["NMS", "NGM", "NCM"]: tickers = _fetch_exchange_tickers(market_code) logger.info(f" 나스닥 {market_code}: {len(tickers)}개 로드") nasdaq_tickers.extend(tickers) nasdaq_tickers = sorted(list(set(nasdaq_tickers))) # 뉴욕증권거래소(NYSE): NYQ nyse_tickers = _fetch_exchange_tickers("NYQ") logger.info(f" 뉴욕 NYQ: {len(nyse_tickers)}개 로드") all_tickers = sorted(list(set(nasdaq_tickers + nyse_tickers))) logger.info(f"나스닥: {len(nasdaq_tickers)}개, 뉴욕: {len(nyse_tickers)}개, 전체: {len(all_tickers)}개 로드 완료") return nasdaq_tickers, nyse_tickers, all_tickers except Exception as e: logger.error(f"야후 스크리너 티커 로드 실패: {e}") return [], [], [] def fetch_ticker_data(ticker, period="max", max_retries=3): """ 개별 티커의 기간별 일별 데이터를 야후 파이낸스에서 조회 - period: yfinance history 기간 파라미터 (예: max, 10y, 5y, 1y, 6mo, 3mo) - interval="1d" : 일별 데이터 """ def _parse_valid_periods(error_message): marker = "must be one of:" if marker not in error_message: return [] raw = error_message.split(marker, 1)[1] return [p.strip().strip("'").strip('"') for p in raw.split(",") if p.strip()] def _choose_fallback_period(requested_period, valid_periods): if not valid_periods: return None preferred_order = ["max", "10y", "5y", "2y", "1y", "6mo", "3mo", "1mo", "5d", "1d"] for candidate in preferred_order: if candidate in valid_periods: return candidate if requested_period in valid_periods: return requested_period return valid_periods[0] effective_period = period for attempt in range(max_retries): try: stock = yf.Ticker(ticker) # 기간별, 일별 데이터 조회 hist = stock.history(period=effective_period, interval="1d") if hist.empty: logger.warning(f"[{ticker}] 데이터 없음 (빈 결과)") return None # 인덱스(날짜)를 컬럼으로 변환 hist = hist.reset_index() # ticker 컬럼 추가 (나중에 티커별 구분용) hist["Ticker"] = ticker # 날짜 컬럼을 문자열로 변환 (데이터셋 호환성) if "Date" in hist.columns: hist["Date"] = hist["Date"].dt.strftime("%Y-%m-%d") elif "Datetime" in hist.columns: hist.rename(columns={"Datetime": "Date"}, inplace=True) hist["Date"] = pd.to_datetime(hist["Date"]).dt.strftime("%Y-%m-%d") # 필요한 컬럼만 선택 columns_to_keep = ["Ticker", "Date", "Open", "High", "Low", "Close", "Volume"] available_cols = [c for c in columns_to_keep if c in hist.columns] hist = hist[available_cols] # 숫자 컬럼 소수점 정리 numeric_cols = ["Open", "High", "Low", "Close"] for col in numeric_cols: if col in hist.columns: hist[col] = hist[col].round(4) # --- 장중 데이터(미확정 종가) 제외 로직 --- # zoneinfo는 Python 내장이라 별도 설치 불필요, 썸머타임(EDT/EST) 자동 처리 ny_tz = ZoneInfo("America/New_York") now_ny = datetime.now(ny_tz) today_ny = now_ny.strftime("%Y-%m-%d") # 정규장 마감: 뉴욕 현지 시간 16:00 (썸머타임이든 아니든 동일) # 여유를 두고 16:30 이후면 종가 확정으로 판단 market_closed = now_ny.hour >= 17 or (now_ny.hour == 16 and now_ny.minute >= 30) if not hist.empty and hist.iloc[-1]["Date"] == today_ny: if not market_closed: # 아직 장중이거나 마감 직후 → 오늘 데이터 제외 (종가 미확정) logger.info(f"[{ticker}] 장중 데이터({today_ny}) 제외 (현재 뉴욕시간: {now_ny.strftime('%H:%M')})") hist = hist.iloc[:-1] else: # 장 마감 후 → 오늘 종가 확정, 포함 logger.info(f"[{ticker}] 장 마감 후 데이터({today_ny}) 포함") # ----------------------------------------------- return hist except Exception as e: error_message = str(e) if "must be one of:" in error_message: valid_periods = _parse_valid_periods(error_message) fallback_period = _choose_fallback_period(effective_period, valid_periods) if fallback_period and fallback_period != effective_period: logger.info( f"[{ticker}] period '{effective_period}' 미지원, '{fallback_period}'로 자동 전환 후 재시도" ) effective_period = fallback_period continue logger.warning(f"[{ticker}] 조회 실패 (시도 {attempt + 1}/{max_retries}): {e}") if attempt < max_retries - 1: time.sleep(1) # 재시도 전 대기 continue return None def filter_last_30_days(df): """전체 데이터에서 티커별 최근 30일 데이터만 필터링""" if df.empty: return df df_copy = df.copy() df_copy["_date_parsed"] = pd.to_datetime(df_copy["Date"], errors="coerce") invalid_date_count = int(df_copy["_date_parsed"].isna().sum()) if invalid_date_count > 0: logger.warning(f"Date 파싱 실패 행 {invalid_date_count}개는 30일 필터에서 제외됩니다.") df_copy = df_copy[df_copy["_date_parsed"].notna()].copy() if df_copy.empty: return pd.DataFrame(columns=df.columns) max_date_by_ticker = df_copy.groupby("Ticker")["_date_parsed"].transform("max") cutoff_by_ticker = max_date_by_ticker - pd.Timedelta(days=30) result = df_copy[df_copy["_date_parsed"] >= cutoff_by_ticker].copy() if result.empty: return pd.DataFrame(columns=df.columns) result = result.reset_index(drop=True) result.drop(columns=["_date_parsed"], inplace=True) return result def upload_dataset_to_hf(df, repo_name, hf_token, max_retries=3, retry_wait_sec=2): """데이터프레임을 허깅페이스 데이터셋으로 업로드(재시도/진단 정보 포함)""" if df is None or df.empty: return { "ok": False, "repo": repo_name, "rows": 0, "attempts": 0, "elapsed_sec": 0.0, "error": "업로드할 데이터가 없습니다.", "traceback": "", } last_error = "" last_traceback = "" start_ts = time.time() for attempt in range(1, max_retries + 1): try: dataset = Dataset.from_pandas(df, preserve_index=False) dataset.push_to_hub( repo_name, token=hf_token, private=False # 공개 데이터셋 ) return { "ok": True, "repo": repo_name, "rows": len(df), "attempts": attempt, "elapsed_sec": time.time() - start_ts, "error": "", "traceback": "", } except Exception as e: last_error = str(e) last_traceback = traceback.format_exc() logger.warning(f"[{repo_name}] 업로드 실패 (시도 {attempt}/{max_retries}): {last_error}") if attempt < max_retries: time.sleep(retry_wait_sec * attempt) return { "ok": False, "repo": repo_name, "rows": len(df), "attempts": max_retries, "elapsed_sec": time.time() - start_ts, "error": last_error, "traceback": last_traceback, } def append_parquet_chunk_to_hf(df, repo_name, hf_token, subdir="data/chunks", max_retries=3, retry_wait_sec=2): """데이터프레임을 Parquet 청크 파일로 허깅페이스 데이터셋 저장소에 추가 업로드""" if df is None or df.empty: return { "ok": False, "repo": repo_name, "rows": 0, "attempts": 0, "elapsed_sec": 0.0, "error": "업로드할 데이터가 없습니다.", "traceback": "", } api = HfApi() last_error = "" last_traceback = "" start_ts = time.time() for attempt in range(1, max_retries + 1): temp_path = None try: api.create_repo( repo_id=repo_name, repo_type="dataset", token=hf_token, private=False, exist_ok=True, ) chunk_name = f"chunk-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:8]}.parquet" with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp: temp_path = tmp.name df.to_parquet(temp_path, index=False) path_in_repo = f"{subdir}/{chunk_name}" api.upload_file( path_or_fileobj=temp_path, path_in_repo=path_in_repo, repo_id=repo_name, repo_type="dataset", token=hf_token, ) return { "ok": True, "repo": repo_name, "rows": len(df), "attempts": attempt, "elapsed_sec": time.time() - start_ts, "error": "", "traceback": "", } except Exception as e: last_error = str(e) last_traceback = traceback.format_exc() logger.warning(f"[{repo_name}] 청크 업로드 실패 (시도 {attempt}/{max_retries}): {last_error}") if attempt < max_retries: time.sleep(retry_wait_sec * attempt) finally: if temp_path and os.path.exists(temp_path): try: os.remove(temp_path) except Exception: pass return { "ok": False, "repo": repo_name, "rows": len(df), "attempts": max_retries, "elapsed_sec": time.time() - start_ts, "error": last_error, "traceback": last_traceback, } def run_pipeline( hf_token, all_dataset_name, recent_dataset_name, batch_size, period, checkpoint_batch_size, progress=gr.Progress() ): """ 전체 파이프라인 실행 1. 나스닥 & 뉴욕 티커 목록 가져오기 2. 티커별 야후 파이낸스 일별 데이터 수집 3. 전체기간 데이터셋 (all) 생성 4. 최근 30일 데이터셋 생성 """ if not hf_token: return "❌ 허깅페이스 토큰이 필요합니다. HF_TOKEN 환경변수 또는 입력창에 토큰을 넣어주세요." logs = [] try: def _df_stats(df, label): if df is None or df.empty: return f"{label}: 0행" mem_mb = df.memory_usage(deep=True).sum() / (1024 * 1024) return f"{label}: {len(df)}행 x {len(df.columns)}열, 메모리 약 {mem_mb:.2f}MB" def _append_upload_result(log_prefix, result): status_icon = "✅" if result["ok"] else "❌" logs.append( f" - {log_prefix}: {status_icon} {result['repo']} | " f"rows={result['rows']} | attempts={result['attempts']} | elapsed={result['elapsed_sec']:.1f}s" ) if not result["ok"]: logs.append(f" 오류: {result['error']}") if result["traceback"]: logs.append(" Traceback:") logs.append(result["traceback"]) logs.append("=" * 60) logs.append("📊 주식 데이터 수집 파이프라인 시작") logs.append(f"⏰ 시작 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logs.append("=" * 60) logs.append("ℹ️ 메모리 절약 모드: 100개 단위 등 청크 업로드 후 버퍼를 즉시 비웁니다.") # ========== 1단계: 티커 목록 수집 ========== progress(0, desc="나스닥 & 뉴욕 티커 목록 수집 중...") logs.append("\n🔍 [1단계] 나스닥 & 뉴욕증권거래소 티커 목록 수집 중...") nasdaq_tickers, nyse_tickers, all_tickers = get_all_us_tickers() logs.append(f" - 나스닥: {len(nasdaq_tickers)}개") logs.append(f" - 뉴욕증권거래소: {len(nyse_tickers)}개") logs.append(f" - 전체: {len(all_tickers)}개") if not all_tickers: logs.append("\n⚠️ Yahoo Screener에서 티커를 가져오지 못했습니다. HF 데이터셋 기반 폴백을 시도합니다.") fallback_tickers = [] fallback_errors = [] try: recent_df = load_hf_dataset_as_df(recent_dataset_name, hf_token) fallback_tickers = sorted( recent_df["Ticker"].dropna().astype(str).str.upper().unique().tolist() ) if fallback_tickers: logs.append(f" - 폴백 소스: recent 데이터셋 ({recent_dataset_name})") except Exception as e: fallback_errors.append(f"recent 로드 실패: {e}") if not fallback_tickers: try: all_existing_df = load_hf_dataset_as_df(all_dataset_name, hf_token) fallback_tickers = sorted( all_existing_df["Ticker"].dropna().astype(str).str.upper().unique().tolist() ) if fallback_tickers: logs.append(f" - 폴백 소스: all 데이터셋 ({all_dataset_name})") except Exception as e: fallback_errors.append(f"all 로드 실패: {e}") if fallback_tickers: all_tickers = fallback_tickers logs.append(f" - 폴백 티커 수: {len(all_tickers)}개") else: if fallback_errors: logs.append(" - 폴백 실패 상세:") for err in fallback_errors: logs.append(f" * {err}") logs.append("\n가능한 원인:") logs.append(" 1) Yahoo Screener 일시 장애/차단") logs.append(" 2) 네트워크/지역 제한") logs.append(" 3) yfinance API 변경") return "\n".join(logs) + "\n\n❌ 티커 목록을 가져올 수 없습니다." # ========== 2단계: 기존 데이터셋 로드 + 재개 대상 계산 ========== progress(0.08, desc="기존 데이터셋 로드 중...") logs.append("\n📂 [2단계] 기존 데이터셋 로드 및 재개 대상 계산...") recent_for_resume = pd.DataFrame(columns=["Ticker"]) try: recent_for_resume = load_hf_dataset_as_df(recent_dataset_name, hf_token) logs.append(f" - 기존 30d 데이터: {len(recent_for_resume)}행") except Exception as e: logs.append(f" - 기존 30d 데이터 로드 실패(신규 수집 기준으로 진행): {e}") existing_tickers = set(recent_for_resume["Ticker"].dropna().astype(str).str.upper().tolist()) if not existing_tickers: logs.append(" - 기존 티커 정보가 비어 있어 전체 대상 기준으로 진행합니다.") pending_tickers = [ticker for ticker in all_tickers if ticker not in existing_tickers] logs.append(f" - 기존 수집 티커: {len(existing_tickers)}개") logs.append(f" - 이번 실행 대상 티커: {len(pending_tickers)}개") if not pending_tickers: progress(1.0, desc="완료!") logs.append("\n✅ 이미 수집된 티커입니다. 추가 수집할 대상이 없습니다.") return "\n".join(logs) # ========== 3단계: 야후 파이낸스 데이터 수집 ========== logs.append(f"\n📥 [3단계] 야후 파이낸스 데이터 수집 시작 (총 {len(pending_tickers)}개 티커)") logs.append(f" - 배치 크기: {batch_size}") logs.append(f" - 조회 기간(period): {period}") logs.append(f" - 체크포인트 업로드 간격: {checkpoint_batch_size}개 티커") logs.append(f" ⚠️ 반복문이라 오래 걸립니다. 전체 티커 수에 따라 수 시간 소요될 수 있습니다.") all_data_frames = [] recent_30d_frames = [] success_count = 0 fail_count = 0 last_checkpoint_success_index = 0 total = len(pending_tickers) def _upload_checkpoint(end_index): nonlocal last_checkpoint_success_index if success_count <= last_checkpoint_success_index: return logs.append( f"\n💾 [체크포인트] {end_index}/{total} 처리 시점 중간 업로드 시작 " f"(누적 성공 {success_count}개)" ) if not all_data_frames: return checkpoint_all_df = pd.concat(all_data_frames, ignore_index=True) checkpoint_recent_df = pd.concat(recent_30d_frames, ignore_index=True) logs.append(f" - {_df_stats(checkpoint_all_df, 'all 청크')}") logs.append(f" - {_df_stats(checkpoint_recent_df, '30d 청크')}") result_all_ckpt = append_parquet_chunk_to_hf( checkpoint_all_df, all_dataset_name, hf_token, subdir="data/chunks/all" ) result_30d_ckpt = append_parquet_chunk_to_hf( checkpoint_recent_df, recent_dataset_name, hf_token, subdir="data/chunks/30d" ) _append_upload_result("all 체크포인트", result_all_ckpt) _append_upload_result("30d 체크포인트", result_30d_ckpt) if not result_all_ckpt["ok"] or not result_30d_ckpt["ok"]: raise RuntimeError("체크포인트 업로드 실패로 파이프라인을 중단합니다.") all_data_frames.clear() recent_30d_frames.clear() gc.collect() last_checkpoint_success_index = success_count for i, ticker in enumerate(pending_tickers): # 진행률 업데이트 progress_pct = 0.1 + ((i + 1) / total) * 0.75 progress(progress_pct, desc=f"수집 중: {ticker} ({i + 1}/{total})") ticker_df = fetch_ticker_data(ticker, period=period) if ticker_df is not None and not ticker_df.empty: all_data_frames.append(ticker_df) recent_30d_frames.append(filter_last_30_days(ticker_df)) success_count += 1 else: fail_count += 1 # 배치 단위로 로그 출력 if (i + 1) % batch_size == 0 or (i + 1) == total: logs.append(f" 진행: {i + 1}/{total} (성공: {success_count}, 실패: {fail_count})") if checkpoint_batch_size > 0 and ((i + 1) % checkpoint_batch_size == 0): checkpoint_progress = min(0.89, max(progress_pct, 0.82)) progress(checkpoint_progress, desc=f"중간 업로드 중... ({i + 1}/{total})") _upload_checkpoint(i + 1) # API 호출 간 짧은 대기 (야후 차단 방지) if (i + 1) % 10 == 0: time.sleep(0.5) logs.append(f"\n📊 수집 완료: 성공 {success_count}개 / 실패 {fail_count}개") if success_count == 0: return "\n".join(logs) + "\n\n❌ 수집된 데이터가 없습니다." # ========== 4단계: 마지막 미반영 체크포인트 반영 ========== progress(0.9, desc="마지막 체크포인트 반영 중...") logs.append("\n🔧 [4단계] 마지막 미반영 데이터 반영 중...") if success_count > last_checkpoint_success_index: logs.append("\n💾 [최종반영] 중간 업로드 없이 누적된 데이터 반영") _upload_checkpoint(total) progress(0.97, desc="청크 업로드 상태 마무리 중...") logs.append("\n🚀 [5단계] 청크 업로드 모드 완료") logs.append(" - all/30d 모두 청크 파일 기준으로 저장되었습니다.") logs.append(" - 다음 실행 시 30d 티커 목록 기준으로 자동 스킵/재개됩니다.") # ========== 완료 ========== progress(1.0, desc="완료!") logs.append("\n" + "=" * 60) logs.append(f"✅ 파이프라인 완료!") logs.append(f"⏰ 종료 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logs.append("=" * 60) return "\n".join(logs) except Exception as e: logger.exception("run_pipeline 실행 중 예외 발생") logs.append("\n" + "=" * 60) logs.append("❌ 파이프라인 실행 중 예외가 발생했습니다.") logs.append(f"오류 메시지: {e}") logs.append("\n[Traceback]") logs.append(traceback.format_exc()) logs.append("=" * 60) return "\n".join(logs) def preview_tickers(): """티커 목록 미리보기 (수집 전 확인용)""" nasdaq, nyse, combined = get_all_us_tickers() if not combined: return """❌ 티커 목록을 가져오지 못했습니다. 가능한 원인: 1) Yahoo Screener 일시 장애/차단 2) 네트워크/지역 제한 3) yfinance API 변경 잠시 후 다시 시도하거나, 파이프라인 실행 시 HF 데이터셋 폴백이 동작하는지 확인해 주세요. """ info = f"""📊 티커 목록 미리보기 ━━━━━━━━━━━━━━━━━━━━━ 나스닥: {len(nasdaq)}개 뉴욕증권거래소: {len(nyse)}개 전체: {len(combined)}개 나스닥 앞 20개: {', '.join(nasdaq[:20])}... 뉴욕 앞 20개: {', '.join(nyse[:20])}... """ return info # ========== Gradio UI 구성 ========== with gr.Blocks( title="주식 데이터셋 생성기" ) as demo: gr.Markdown(""" # 📈 나스닥 & 뉴욕증권거래소 주식 데이터셋 생성기 **야후 파이낸스에서 전체 티커의 일별 데이터를 수집하여 허깅페이스 데이터셋으로 자동 업로드합니다.** ### 파이프라인 흐름 1. 🔍 나스닥 & 뉴욕증권거래소 전체 티커 목록 수집 2. 📥 티커별 야후 파이낸스 일별 데이터 조회 (`period` 설정 가능) 3. 📦 **all 데이터셋** 생성 (전체기간 데이터) 4. 🗓️ 티커별 최근 30일 필터링 → **30일 데이터셋** 생성 5. 🚀 허깅페이스 허브에 업로드 > ⚠️ 전체 티커를 반복 조회하므로 **수 시간이 소요**될 수 있습니다. """) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### ⚙️ 설정") hf_token_input = gr.Textbox( label="허깅페이스 토큰 (HF_TOKEN)", value=HF_TOKEN, type="password", placeholder="hf_xxxxx...", info="Spaces 시크릿에 HF_TOKEN이 설정되어 있으면 자동으로 불러옵니다." ) all_dataset_input = gr.Textbox( label="전체기간 데이터셋 이름 (all)", value="younginpiniti/us-stocks-daily-all", placeholder="username/dataset-name", info="전체기간 일별 데이터가 저장될 허깅페이스 데이터셋" ) recent_dataset_input = gr.Textbox( label="최근 30일 데이터셋 이름 (30d)", value="younginpiniti/us-stocks-daily-30d", placeholder="username/dataset-name", info="최근 30일 일별 데이터가 저장될 허깅페이스 데이터셋" ) batch_size_input = gr.Slider( label="로그 출력 배치 크기", minimum=10, maximum=500, value=100, step=10, info="몇 개 티커마다 로그를 출력할지 설정" ) period_input = gr.Dropdown( label="조회 기간 (Yahoo period)", choices=["max", "10y", "5y", "2y", "1y", "6mo", "3mo", "1mo"], value="max", info="전체 수집 시간이 길면 10y/5y 등으로 줄여 실행할 수 있습니다." ) checkpoint_batch_input = gr.Dropdown( label="중간 업로드 간격 (티커 수)", choices=[0, 50, 100, 200, 500], value=100, info="0이면 중간 업로드 없이 마지막에만 업로드합니다." ) with gr.Row(): preview_btn = gr.Button("👀 티커 목록 미리보기", variant="secondary") start_btn = gr.Button("🚀 파이프라인 시작", variant="primary") realtime_btn = gr.Button("⚡ 실시간 시작", variant="secondary") with gr.Column(scale=2): gr.Markdown("### 📋 실행 로그") output_log = gr.Textbox( label="로그 출력", lines=30, max_lines=50, interactive=False ) # 이벤트 연결 preview_btn.click( fn=preview_tickers, inputs=[], outputs=[output_log] ) start_btn.click( fn=run_pipeline, inputs=[ hf_token_input, all_dataset_input, recent_dataset_input, batch_size_input, period_input, checkpoint_batch_input ], outputs=[output_log] ) realtime_btn.click( fn=run_realtime_update, inputs=[ hf_token_input, all_dataset_input, recent_dataset_input ], outputs=[output_log] ) gr.Markdown(""" --- ### 📌 데이터셋 구조 | 컬럼 | 설명 | 예시 | |------|------|------| | `Ticker` | 종목 티커 심볼 | AAPL, MSFT, TSLA | | `Date` | 거래일 (YYYY-MM-DD) | 2024-01-15 | | `Open` | 시가 | 185.3200 | | `High` | 고가 | 187.0400 | | `Low` | 저가 | 184.2100 | | `Close` | 종가 | 186.0000 | | `Volume` | 거래량 | 45123456 | > 💡 **팁**: 티커별로 전처리할 때는 `Ticker` 컬럼으로 그룹핑하면 됩니다. > ```python > from datasets import load_dataset > ds = load_dataset("younginpiniti/us-stocks-daily-all") > df = ds["train"].to_pandas() > aapl = df[df["Ticker"] == "AAPL"] > ``` """) if __name__ == "__main__": demo.launch(theme=gr.themes.Soft())