superxu520 commited on
Commit
3d1ce67
·
1 Parent(s): 91985a9

"fix:optimize-daily-sync-global-watermark"

Browse files
Files changed (1) hide show
  1. sync_data.py +70 -12
sync_data.py CHANGED
@@ -471,21 +471,79 @@ def get_index_daily(code: str) -> Optional[pd.DataFrame]:
471
 
472
 
473
  def sync_stock_daily(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]:
474
- """增量同步逻辑,返回详细结果"""
475
- db = get_db()
476
 
477
- # 获取现有数据的最新日期
478
- existing_latest = db.conn.execute("SELECT code, CAST(MAX(trade_date) AS VARCHAR) FROM stock_daily GROUP BY code").fetchall()
479
- latest_map = {row[0]: row[1] for row in existing_latest}
480
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
481
  pending = []
482
- for t in targets:
483
- code = t['code']
484
- if code in latest_map:
485
- if latest_map[code] >= last_trade_day: continue
486
- start_dt = (pd.to_datetime(latest_map[code]) + timedelta(days=1)).strftime('%Y-%m-%d')
487
- else:
488
- start_dt = (get_beijing_time() - timedelta(days=YEARS_OF_DATA * 365)).strftime('%Y-%m-%d')
489
  t['start_dt'] = start_dt
490
  pending.append(t)
491
 
 
471
 
472
 
473
  def sync_stock_daily(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]:
474
+ """增量同步逻辑,返回详细结果(采用全局水位线机制)"""
475
+ logger.info("Syncing daily data...")
476
 
477
+ # 1. 扫描本地 parquet 文件获取全局最新日期(类似其他指标)
478
+ parquet_dir = Path("/tmp/data/parquet")
479
+ parquet_dir.mkdir(parents=True, exist_ok=True)
480
 
481
+ global_latest_date = "2000-01-01"
482
+ existing_codes = set()
483
+
484
+ for f in parquet_dir.glob("*.parquet"):
485
+ if f.name.startswith('index_'): # 跳过指数文件
486
+ continue
487
+ try:
488
+ df = pd.read_parquet(f)
489
+ if not df.empty and 'trade_date' in df.columns:
490
+ max_date = df['trade_date'].max()
491
+ if isinstance(max_date, pd.Timestamp):
492
+ max_date = max_date.strftime('%Y-%m-%d')
493
+ if max_date > global_latest_date:
494
+ global_latest_date = max_date
495
+ existing_codes.update(df['code'].unique())
496
+ except Exception:
497
+ pass
498
+
499
+ # 2. 如果本地没有数据,尝试从云端下载最近3个月作为基准
500
+ if global_latest_date == "2000-01-01":
501
+ repo_id = os.getenv("DATASET_REPO_ID")
502
+ if repo_id:
503
+ try:
504
+ files = list_repo_files(repo_id=repo_id, repo_type="dataset")
505
+ parquet_files = sorted([f for f in files if f.startswith("data/parquet/") and f.endswith(".parquet")])
506
+
507
+ # 下载最近3个月的数据作为基准
508
+ for pf in parquet_files[-3:]:
509
+ try:
510
+ local_file = hf_hub_download(repo_id=repo_id, filename=pf, repo_type="dataset")
511
+ df = pd.read_parquet(local_file)
512
+ if not df.empty and 'trade_date' in df.columns:
513
+ max_date = df['trade_date'].max()
514
+ if isinstance(max_date, pd.Timestamp):
515
+ max_date = max_date.strftime('%Y-%m-%d')
516
+ if max_date > global_latest_date:
517
+ global_latest_date = max_date
518
+ existing_codes.update(df['code'].unique())
519
+ except Exception:
520
+ pass
521
+ logger.info(f"Downloaded daily data from cloud, latest date: {global_latest_date}")
522
+ except Exception as e:
523
+ logger.info(f"No existing daily data in cloud: {e}")
524
+
525
+ # 3. 区分新股和存量股票
526
+ new_codes = [t for t in targets if t['code'] not in existing_codes]
527
+
528
+ # 4. 全局水位线拦截
529
+ if global_latest_date >= last_trade_day and not new_codes:
530
+ logger.info(f"Daily data is already up to date ({global_latest_date}) and no new stocks. Skip.")
531
+ return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'message': f'Already up to date ({global_latest_date})'}
532
+
533
+ # 5. 确定同步策略
534
+ if global_latest_date >= last_trade_day:
535
+ logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.")
536
+ sync_targets = new_codes
537
+ # 新股只获取最近1年数据(而非10年)
538
+ start_dt = (get_beijing_time() - timedelta(days=365)).strftime('%Y-%m-%d')
539
+ else:
540
+ logger.info(f"Syncing daily data from {global_latest_date} to {last_trade_day}...")
541
+ sync_targets = targets
542
+ start_dt = (pd.to_datetime(global_latest_date) + timedelta(days=1)).strftime('%Y-%m-%d')
543
+
544
+ # 设置每只股票的start_dt
545
  pending = []
546
+ for t in sync_targets:
 
 
 
 
 
 
547
  t['start_dt'] = start_dt
548
  pending.append(t)
549