ykjung commited on
Commit
93ac31f
·
1 Parent(s): c90b031

feat: enhance run_pipeline to load existing datasets and calculate pending tickers for collection

Browse files
Files changed (3) hide show
  1. README.md +17 -8
  2. __pycache__/app.cpython-313.pyc +0 -0
  3. app.py +78 -27
README.md CHANGED
@@ -72,7 +72,10 @@ UI 연결:
72
  1. **티커 목록 수집**
73
  - `get_all_us_tickers()` 호출
74
  - 전체 대상 티커 개수 계산
75
- 2. **티커별 가격 데이터 수집**
 
 
 
76
  - 각 티커에 대해 `fetch_ticker_data(ticker)` 실행
77
  - `yf.Ticker(ticker).history(period=period, interval="1d")` 호출
78
  - 컬럼 정리: `Ticker`, `Date`, `Open`, `High`, `Low`, `Close`, `Volume`
@@ -82,16 +85,17 @@ UI 연결:
82
  - 마감 확정 전(today) 데이터는 제외
83
  - 실패 시 최대 3회 재시도
84
  - 10티커마다 0.5초 대기(호출 차단 완화)
85
- - 설정한 간격(`checkpoint_batch_size`)마다 중간 업로드(체크포인트) 수행 가능
86
- 3. **전체 데이터 병합**
87
- - 수집 성공한 티커 데이터프레임을 `concat`
88
- 4. **최근 30일 데이터셋 생성**
89
- - 티커별로 미리 계산한 30일 데이터들을 병합
90
- 5. **Hugging Face 드**
 
91
  - `upload_dataset_to_hf(all_df, all_dataset_name, hf_token)`
92
  - `upload_dataset_to_hf(recent_30d_df, recent_dataset_name, hf_token)`
93
  - 내부적으로 `Dataset.from_pandas(...).push_to_hub(...)` 사용
94
- 6. **완료 로그 반환**
95
  - 시작/종료 시간, 성공/실패 개수, 업로드 결과를 텍스트로 반환
96
 
97
  진행률(Progress Bar):
@@ -162,6 +166,11 @@ UI 연결:
162
  - 예: `100`이면 100개 티커 처리마다 중간 업로드
163
  - `0`이면 중간 업로드 없이 마지막에 한 번만 업로드
164
 
 
 
 
 
 
165
  ---
166
 
167
  ## 참고
 
72
  1. **티커 목록 수집**
73
  - `get_all_us_tickers()` 호출
74
  - 전체 대상 티커 개수 계산
75
+ 2. **기존 데이터 로드 + 재개 대상 계산**
76
+ - 기존 `all`, `30d` 데이터셋 로드 시도
77
+ - 이미 수집된 티커를 제외하고 이번 실행 대상 티커만 계산
78
+ 3. **티커별 가격 데이터 수집**
79
  - 각 티커에 대해 `fetch_ticker_data(ticker)` 실행
80
  - `yf.Ticker(ticker).history(period=period, interval="1d")` 호출
81
  - 컬럼 정리: `Ticker`, `Date`, `Open`, `High`, `Low`, `Close`, `Volume`
 
85
  - 마감 확정 전(today) 데이터는 제외
86
  - 실패 시 최대 3회 재시도
87
  - 10티커마다 0.5초 대기(호출 차단 완화)
88
+ - 설정한 간격(`checkpoint_batch_size`)마다 기존 데이터셋과 병합 후 중간 업로드(체크포인트) 수행
89
+ 4. **최종 병합 반영**
90
+ - 마지막 미반영 버퍼를 기존 데이터셋에 병합
91
+ - `Ticker + Date` 기준 중복 제거(최신값 유지)
92
+ 5. **최근 30일 데이터 갱신**
93
+ - 병합된 데이터 기준으 티커별 30일 윈도우 유지
94
+ 6. **Hugging Face 업로드**
95
  - `upload_dataset_to_hf(all_df, all_dataset_name, hf_token)`
96
  - `upload_dataset_to_hf(recent_30d_df, recent_dataset_name, hf_token)`
97
  - 내부적으로 `Dataset.from_pandas(...).push_to_hub(...)` 사용
98
+ 7. **완료 로그 반환**
99
  - 시작/종료 시간, 성공/실패 개수, 업로드 결과를 텍스트로 반환
100
 
101
  진행률(Progress Bar):
 
166
  - 예: `100`이면 100개 티커 처리마다 중간 업로드
167
  - `0`이면 중간 업로드 없이 마지막에 한 번만 업로드
168
 
169
+ 재실행(Resume) 동작:
170
+
171
+ - 이미 데이터셋에 존재하는 티커는 자동으로 스킵합니다.
172
+ - 이전 실행에서 중간에 실패해도, 다음 실행 시 남은 티커 위주로 이어서 수집합니다.
173
+
174
  ---
175
 
176
  ## 참고
__pycache__/app.cpython-313.pyc CHANGED
Binary files a/__pycache__/app.cpython-313.pyc and b/__pycache__/app.cpython-313.pyc differ
 
app.py CHANGED
@@ -16,6 +16,7 @@ import time
16
  import logging
17
  import json
18
  import traceback
 
19
  from urllib.request import Request, urlopen
20
 
21
  # 로깅 설정
@@ -483,6 +484,16 @@ def run_pipeline(
483
 
484
  logs = []
485
  try:
 
 
 
 
 
 
 
 
 
 
486
  logs.append("=" * 60)
487
  logs.append("📊 주식 데이터 수집 파이프라인 시작")
488
  logs.append(f"⏰ 시작 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
@@ -541,8 +552,41 @@ def run_pipeline(
541
 
542
  return "\n".join(logs) + "\n\n❌ 티커 목록을 가져올 수 없습니다."
543
 
544
- # ========== 2단계: 야후 파이낸스 데이터 수집 ==========
545
- logs.append(f"\n📥 [2단계] 야후 파이낸스 데이터 수집 시작 (총 {len(all_tickers)}개 티커)")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
546
  logs.append(f" - 배치 크기: {batch_size}")
547
  logs.append(f" - 조회 기간(period): {period}")
548
  logs.append(f" - 체크포인트 업로드 간격: {checkpoint_batch_size}개 티커")
@@ -553,10 +597,10 @@ def run_pipeline(
553
  success_count = 0
554
  fail_count = 0
555
  last_checkpoint_success_index = 0
556
- total = len(all_tickers)
557
 
558
  def _upload_checkpoint(end_index):
559
- nonlocal last_checkpoint_success_index
560
 
561
  if success_count <= last_checkpoint_success_index:
562
  return
@@ -566,20 +610,31 @@ def run_pipeline(
566
  f"(누적 성공 {success_count}개)"
567
  )
568
 
 
 
 
569
  checkpoint_all_df = pd.concat(all_data_frames, ignore_index=True)
570
  checkpoint_recent_df = pd.concat(recent_30d_frames, ignore_index=True)
571
 
572
- result_all_ckpt = upload_dataset_to_hf(checkpoint_all_df, all_dataset_name, hf_token)
573
- result_30d_ckpt = upload_dataset_to_hf(checkpoint_recent_df, recent_dataset_name, hf_token)
 
 
 
 
574
 
575
  logs.append(f" - all 체크포인트: {result_all_ckpt}")
576
  logs.append(f" - 30d 체크포인트: {result_30d_ckpt}")
577
 
 
 
 
 
578
  last_checkpoint_success_index = success_count
579
 
580
- for i, ticker in enumerate(all_tickers):
581
  # 진행률 업데이트
582
- progress_pct = (i + 1) / total
583
  progress(progress_pct, desc=f"수집 중: {ticker} ({i + 1}/{total})")
584
 
585
  ticker_df = fetch_ticker_data(ticker, period=period)
@@ -606,41 +661,37 @@ def run_pipeline(
606
 
607
  logs.append(f"\n📊 수집 완료: 성공 {success_count}개 / 실패 {fail_count}개")
608
 
609
- if not all_data_frames:
610
  return "\n".join(logs) + "\n\n❌ 수집된 데이터가 없습니다."
611
 
612
- # ========== 3단계: 데이터 합치기 ==========
613
- progress(0.9, desc="데이터 병합 중...")
614
- logs.append("\n🔧 [3단계] 티커별 범위 데이터 병합 중...")
615
-
616
- all_df = pd.concat(all_data_frames, ignore_index=True)
617
- logs.append(f" - 전체 데이터: {len(all_df)}행 x {len(all_df.columns)}열")
618
- logs.append(f" - 고유 티커 수: {all_df['Ticker'].nunique()}")
619
-
620
- # ========== 4단계: 30일 데이터 병합 ==========
621
- progress(0.93, desc="최근 30일 데이터 병합 중...")
622
- logs.append("\n🗓️ [4단계] 티커별 최근 30일 데이터 병합 중...")
623
-
624
- recent_30d_df = pd.concat(recent_30d_frames, ignore_index=True)
625
- progress(0.96, desc="최근 30일 데이터 병합 완료")
626
- logs.append(f" - 30일 데이터: {len(recent_30d_df)}행 x {len(recent_30d_df.columns)}열")
627
- logs.append(f" - 고유 티커 수: {recent_30d_df['Ticker'].nunique()}")
628
 
629
  if checkpoint_batch_size > 0 and success_count > last_checkpoint_success_index:
630
  logs.append("\n💾 [체크포인트] 마지막 미반영 구간 업로드")
631
  _upload_checkpoint(total)
632
 
 
 
 
 
 
 
 
 
 
633
  # ========== 5단계: 허깅페이스 데이터셋 업로드 ==========
634
  progress(0.97, desc="all 데이터셋 업로드 중...")
635
  logs.append("\n🚀 [5단계] 허깅페이스 데이터셋 업로드 중...")
636
 
637
  # all 데이터셋 업로드
638
- result_all = upload_dataset_to_hf(all_df, all_dataset_name, hf_token)
639
  logs.append(f" {result_all}")
640
 
641
  # 30일 데이터셋 업로드
642
  progress(0.99, desc="30d 데이터셋 업로드 중...")
643
- result_30d = upload_dataset_to_hf(recent_30d_df, recent_dataset_name, hf_token)
644
  logs.append(f" {result_30d}")
645
 
646
  # ========== 완료 ==========
 
16
  import logging
17
  import json
18
  import traceback
19
+ import gc
20
  from urllib.request import Request, urlopen
21
 
22
  # 로깅 설정
 
484
 
485
  logs = []
486
  try:
487
+ def _merge_rows(base_df, incoming_df):
488
+ if incoming_df is None or incoming_df.empty:
489
+ return base_df
490
+ if base_df is None or base_df.empty:
491
+ return incoming_df.reset_index(drop=True)
492
+
493
+ merged = pd.concat([base_df, incoming_df], ignore_index=True)
494
+ merged = merged.drop_duplicates(subset=["Ticker", "Date"], keep="last")
495
+ return merged.reset_index(drop=True)
496
+
497
  logs.append("=" * 60)
498
  logs.append("📊 주식 데이터 수집 파이프라인 시작")
499
  logs.append(f"⏰ 시작 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
 
552
 
553
  return "\n".join(logs) + "\n\n❌ 티커 목록을 가져올 수 없습니다."
554
 
555
+ # ========== 2단계: 기존 데이터 로드 + 재개 대상 계산 ==========
556
+ progress(0.08, desc="기존 데이터 로드 중...")
557
+ logs.append("\n📂 [2단계] 기존 데이터셋 로드 및 재개 대상 계산...")
558
+
559
+ base_all_df = pd.DataFrame(columns=["Ticker", "Date", "Open", "High", "Low", "Close", "Volume"])
560
+ base_recent_df = pd.DataFrame(columns=["Ticker", "Date", "Open", "High", "Low", "Close", "Volume"])
561
+
562
+ try:
563
+ base_all_df = load_hf_dataset_as_df(all_dataset_name, hf_token)
564
+ logs.append(f" - 기존 all 데이터: {len(base_all_df)}행")
565
+ except Exception as e:
566
+ logs.append(f" - 기존 all 데이터 로드 실패(신규 생성으로 진행): {e}")
567
+
568
+ try:
569
+ base_recent_df = load_hf_dataset_as_df(recent_dataset_name, hf_token)
570
+ logs.append(f" - 기존 30d 데이터: {len(base_recent_df)}행")
571
+ except Exception as e:
572
+ logs.append(f" - 기존 30d 데이터 로드 실패(신규 생성으로 진행): {e}")
573
+
574
+ existing_tickers = set(base_all_df["Ticker"].dropna().astype(str).str.upper().tolist())
575
+ if not existing_tickers:
576
+ existing_tickers = set(base_recent_df["Ticker"].dropna().astype(str).str.upper().tolist())
577
+
578
+ pending_tickers = [ticker for ticker in all_tickers if ticker not in existing_tickers]
579
+
580
+ logs.append(f" - 기존 수집 티커: {len(existing_tickers)}개")
581
+ logs.append(f" - 이번 실행 대상 티커: {len(pending_tickers)}개")
582
+
583
+ if not pending_tickers:
584
+ progress(1.0, desc="완료!")
585
+ logs.append("\n✅ 이미 수집된 티커입니다. 추가 수집할 대상이 없습니다.")
586
+ return "\n".join(logs)
587
+
588
+ # ========== 3단계: 야후 파이낸스 데이터 수집 ==========
589
+ logs.append(f"\n📥 [3단계] 야후 파이낸스 데이터 수집 시작 (총 {len(pending_tickers)}개 티커)")
590
  logs.append(f" - 배치 크기: {batch_size}")
591
  logs.append(f" - 조회 기간(period): {period}")
592
  logs.append(f" - 체크포인트 업로드 간격: {checkpoint_batch_size}개 티커")
 
597
  success_count = 0
598
  fail_count = 0
599
  last_checkpoint_success_index = 0
600
+ total = len(pending_tickers)
601
 
602
  def _upload_checkpoint(end_index):
603
+ nonlocal last_checkpoint_success_index, base_all_df, base_recent_df
604
 
605
  if success_count <= last_checkpoint_success_index:
606
  return
 
610
  f"(누적 성공 {success_count}개)"
611
  )
612
 
613
+ if not all_data_frames:
614
+ return
615
+
616
  checkpoint_all_df = pd.concat(all_data_frames, ignore_index=True)
617
  checkpoint_recent_df = pd.concat(recent_30d_frames, ignore_index=True)
618
 
619
+ base_all_df = _merge_rows(base_all_df, checkpoint_all_df)
620
+ base_recent_df = _merge_rows(base_recent_df, checkpoint_recent_df)
621
+ base_recent_df = filter_last_30_days(base_recent_df)
622
+
623
+ result_all_ckpt = upload_dataset_to_hf(base_all_df, all_dataset_name, hf_token)
624
+ result_30d_ckpt = upload_dataset_to_hf(base_recent_df, recent_dataset_name, hf_token)
625
 
626
  logs.append(f" - all 체크포인트: {result_all_ckpt}")
627
  logs.append(f" - 30d 체크포인트: {result_30d_ckpt}")
628
 
629
+ all_data_frames.clear()
630
+ recent_30d_frames.clear()
631
+ gc.collect()
632
+
633
  last_checkpoint_success_index = success_count
634
 
635
+ for i, ticker in enumerate(pending_tickers):
636
  # 진행률 업데이트
637
+ progress_pct = 0.1 + ((i + 1) / total) * 0.75
638
  progress(progress_pct, desc=f"수집 중: {ticker} ({i + 1}/{total})")
639
 
640
  ticker_df = fetch_ticker_data(ticker, period=period)
 
661
 
662
  logs.append(f"\n📊 수집 완료: 성공 {success_count}개 / 실패 {fail_count}개")
663
 
664
+ if success_count == 0:
665
  return "\n".join(logs) + "\n\n❌ 수집된 데이터가 없습니다."
666
 
667
+ # ========== 4단계: 마지막 미반영 체크포인트 반영 ==========
668
+ progress(0.9, desc="마지막 체크포인트 반영 중...")
669
+ logs.append("\n🔧 [4단계] 마지막 미반영 데이터 반영 중...")
 
 
 
 
 
 
 
 
 
 
 
 
 
670
 
671
  if checkpoint_batch_size > 0 and success_count > last_checkpoint_success_index:
672
  logs.append("\n💾 [체크포인트] 마지막 미반영 구간 업로드")
673
  _upload_checkpoint(total)
674
 
675
+ if checkpoint_batch_size == 0 and all_data_frames:
676
+ logs.append("\n💾 [최종반영] 중간 업로드 없이 누적된 데이터 반영")
677
+ _upload_checkpoint(total)
678
+
679
+ logs.append(f" - 전체 데이터: {len(base_all_df)}행 x {len(base_all_df.columns)}열")
680
+ logs.append(f" - 고유 티커 수: {base_all_df['Ticker'].nunique()}")
681
+ logs.append(f" - 30일 데이터: {len(base_recent_df)}행 x {len(base_recent_df.columns)}열")
682
+ logs.append(f" - 30일 고유 티커 수: {base_recent_df['Ticker'].nunique()}")
683
+
684
  # ========== 5단계: 허깅페이스 데이터셋 업로드 ==========
685
  progress(0.97, desc="all 데이터셋 업로드 중...")
686
  logs.append("\n🚀 [5단계] 허깅페이스 데이터셋 업로드 중...")
687
 
688
  # all 데이터셋 업로드
689
+ result_all = upload_dataset_to_hf(base_all_df, all_dataset_name, hf_token)
690
  logs.append(f" {result_all}")
691
 
692
  # 30일 데이터셋 업로드
693
  progress(0.99, desc="30d 데이터셋 업로드 중...")
694
+ result_30d = upload_dataset_to_hf(base_recent_df, recent_dataset_name, hf_token)
695
  logs.append(f" {result_30d}")
696
 
697
  # ========== 완료 ==========