superxu520 commited on
Commit
2d86dae
·
1 Parent(s): c636e4b

"fix:reduce-thread-count-and-add-exponential-backoff"

Browse files
Files changed (1) hide show
  1. sync_data.py +46 -22
sync_data.py CHANGED
@@ -49,14 +49,14 @@ def get_thread_config():
49
  import multiprocessing
50
  cpu_count = multiprocessing.cpu_count()
51
 
52
- # 分层并发策略
53
  config = {
54
- 'daily': _safe_int_env("MAX_WORKERS_DAILY", cpu_count * 4),
55
- 'fund': _safe_int_env("MAX_WORKERS_FUND", cpu_count * 3),
56
- 'valuation': _safe_int_env("MAX_WORKERS_VALUATION", cpu_count * 3),
57
- 'margin': _safe_int_env("MAX_WORKERS_MARGIN", cpu_count * 3),
58
- 'financial': _safe_int_env("MAX_WORKERS_FINANCIAL", cpu_count * 2),
59
- 'dividend': _safe_int_env("MAX_WORKERS_DIVIDEND", cpu_count * 2),
60
  }
61
 
62
  # 向后兼容
@@ -94,8 +94,9 @@ def get_stock_list() -> pd.DataFrame:
94
  logger.info("Fetching all-market target list...")
95
  all_lists = []
96
 
97
- # A股列表获取(带重试)
98
- max_retries = 3
 
99
  for attempt in range(max_retries):
100
  try:
101
  df_a = ak.stock_zh_a_spot_em()[['代码', '名称']]
@@ -108,8 +109,9 @@ def get_stock_list() -> pd.DataFrame:
108
  if attempt == max_retries - 1:
109
  logger.error(f"Failed to fetch A-stock list after {max_retries} attempts: {e}")
110
  else:
111
- logger.warning(f"Attempt {attempt + 1} failed, retrying... Error: {e}")
112
- time.sleep(5) # 重试前等待 5
 
113
 
114
  # ETF (增加容错)
115
  try:
@@ -164,9 +166,15 @@ def get_stock_list() -> pd.DataFrame:
164
 
165
  def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.DataFrame]:
166
  """抓取单只标的数据"""
167
- max_retries = 3
 
 
168
  for attempt in range(max_retries):
169
  try:
 
 
 
 
170
  end_date = get_beijing_time().strftime('%Y%m%d')
171
  fetch_start = start_date.replace('-', '')
172
  df = None
@@ -369,7 +377,8 @@ def sync_stock_daily(targets: List[Dict[str, str]], last_trade_day: str) -> int:
369
 
370
  def get_stock_fund_flow(code: str, market: str) -> Optional[pd.DataFrame]:
371
  """获取单只股票资金流向数据"""
372
- max_retries = 3
 
373
  for attempt in range(max_retries):
374
  try:
375
  # 确定 market 参数
@@ -410,7 +419,10 @@ def get_stock_fund_flow(code: str, market: str) -> Optional[pd.DataFrame]:
410
  except Exception as e:
411
  if attempt == max_retries - 1:
412
  pass # 静默失败,很多股票可能没有资金流向数据
413
- time.sleep(0.5)
 
 
 
414
  return None
415
 
416
 
@@ -499,7 +511,8 @@ def sync_fund_flow(targets: List[Dict[str, str]], last_trade_day: str) -> int:
499
 
500
  def get_stock_valuation(code: str) -> Optional[pd.DataFrame]:
501
  """获取单只股票估值指标数据"""
502
- max_retries = 3
 
503
  for attempt in range(max_retries):
504
  try:
505
  df = ak.stock_a_lg_indicator(symbol=code)
@@ -525,7 +538,9 @@ def get_stock_valuation(code: str) -> Optional[pd.DataFrame]:
525
  available_cols = [c for c in cols if c in df.columns]
526
  return df[available_cols]
527
  except Exception:
528
- time.sleep(0.5)
 
 
529
  return None
530
 
531
 
@@ -612,7 +627,8 @@ def sync_valuation(targets: List[Dict[str, str]], last_trade_day: str) -> int:
612
 
613
  def get_stock_margin(code: str) -> Optional[pd.DataFrame]:
614
  """获取单只股票融资融券数据"""
615
- max_retries = 3
 
616
  for attempt in range(max_retries):
617
  try:
618
  # 尝试上交所
@@ -643,7 +659,9 @@ def get_stock_margin(code: str) -> Optional[pd.DataFrame]:
643
  except Exception as e:
644
  if attempt == max_retries - 1:
645
  pass
646
- time.sleep(0.5)
 
 
647
  return None
648
 
649
 
@@ -730,7 +748,8 @@ def sync_margin(targets: List[Dict[str, str]], last_trade_day: str) -> int:
730
 
731
  def get_stock_financial_indicator(code: str) -> Optional[pd.DataFrame]:
732
  """获取单只股票财务指标数据"""
733
- max_retries = 3
 
734
  for attempt in range(max_retries):
735
  try:
736
  df = ak.stock_financial_analysis_indicator(symbol=code)
@@ -760,7 +779,9 @@ def get_stock_financial_indicator(code: str) -> Optional[pd.DataFrame]:
760
  available_cols = [c for c in cols if c in df.columns]
761
  return df[available_cols]
762
  except Exception:
763
- time.sleep(0.5)
 
 
764
  return None
765
 
766
 
@@ -890,7 +911,8 @@ def sync_holder_num() -> int:
890
 
891
  def get_stock_dividend(code: str) -> Optional[pd.DataFrame]:
892
  """获取单只股票分红数据"""
893
- max_retries = 3
 
894
  for attempt in range(max_retries):
895
  try:
896
  df = ak.stock_history_dividend(symbol=code)
@@ -912,7 +934,9 @@ def get_stock_dividend(code: str) -> Optional[pd.DataFrame]:
912
  available_cols = [c for c in cols if c in df.columns]
913
  return df[available_cols]
914
  except Exception:
915
- time.sleep(0.5)
 
 
916
  return None
917
 
918
 
 
49
  import multiprocessing
50
  cpu_count = multiprocessing.cpu_count()
51
 
52
+ # 分层并发策略(保守配置,避免 API 限流)
53
  config = {
54
+ 'daily': _safe_int_env("MAX_WORKERS_DAILY", max(2, cpu_count * 2)),
55
+ 'fund': _safe_int_env("MAX_WORKERS_FUND", max(2, cpu_count)),
56
+ 'valuation': _safe_int_env("MAX_WORKERS_VALUATION", max(2, cpu_count)),
57
+ 'margin': _safe_int_env("MAX_WORKERS_MARGIN", max(2, cpu_count)),
58
+ 'financial': _safe_int_env("MAX_WORKERS_FINANCIAL", max(1, cpu_count)),
59
+ 'dividend': _safe_int_env("MAX_WORKERS_DIVIDEND", max(1, cpu_count)),
60
  }
61
 
62
  # 向后兼容
 
94
  logger.info("Fetching all-market target list...")
95
  all_lists = []
96
 
97
+ # A股列表获取(带重试和指数退避
98
+ max_retries = 5
99
+ base_delay = 2.0
100
  for attempt in range(max_retries):
101
  try:
102
  df_a = ak.stock_zh_a_spot_em()[['代码', '名称']]
 
109
  if attempt == max_retries - 1:
110
  logger.error(f"Failed to fetch A-stock list after {max_retries} attempts: {e}")
111
  else:
112
+ delay = base_delay * (2 ** attempt)
113
+ logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s... Error: {e}")
114
+ time.sleep(delay)
115
 
116
  # ETF (增加容错)
117
  try:
 
166
 
167
  def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.DataFrame]:
168
  """抓取单只标的数据"""
169
+ max_retries = 5
170
+ base_delay = 1.0 # 寂础延迟秒
171
+
172
  for attempt in range(max_retries):
173
  try:
174
+ # 指数退避:每次重试增加延迟
175
+ if attempt > 0:
176
+ delay = base_delay * (2 ** attempt) # 1s, 2s, 4s, 8s
177
+ time.sleep(delay)
178
  end_date = get_beijing_time().strftime('%Y%m%d')
179
  fetch_start = start_date.replace('-', '')
180
  df = None
 
377
 
378
  def get_stock_fund_flow(code: str, market: str) -> Optional[pd.DataFrame]:
379
  """获取单只股票资金流向数据"""
380
+ max_retries = 5
381
+ base_delay = 1.0
382
  for attempt in range(max_retries):
383
  try:
384
  # 确定 market 参数
 
419
  except Exception as e:
420
  if attempt == max_retries - 1:
421
  pass # 静默失败,很多股票可能没有资金流向数据
422
+ # 指数退避:每次重试增加延迟
423
+ if attempt > 0:
424
+ delay = base_delay * (2 ** attempt)
425
+ time.sleep(delay)
426
  return None
427
 
428
 
 
511
 
512
  def get_stock_valuation(code: str) -> Optional[pd.DataFrame]:
513
  """获取单只股票估值指标数据"""
514
+ max_retries = 5
515
+ base_delay = 1.0
516
  for attempt in range(max_retries):
517
  try:
518
  df = ak.stock_a_lg_indicator(symbol=code)
 
538
  available_cols = [c for c in cols if c in df.columns]
539
  return df[available_cols]
540
  except Exception:
541
+ if attempt > 0:
542
+ delay = base_delay * (2 ** attempt)
543
+ time.sleep(delay)
544
  return None
545
 
546
 
 
627
 
628
  def get_stock_margin(code: str) -> Optional[pd.DataFrame]:
629
  """获取单只股票融资融券数据"""
630
+ max_retries = 5
631
+ base_delay = 1.0
632
  for attempt in range(max_retries):
633
  try:
634
  # 尝试上交所
 
659
  except Exception as e:
660
  if attempt == max_retries - 1:
661
  pass
662
+ if attempt > 0:
663
+ delay = base_delay * (2 ** attempt)
664
+ time.sleep(delay)
665
  return None
666
 
667
 
 
748
 
749
  def get_stock_financial_indicator(code: str) -> Optional[pd.DataFrame]:
750
  """获取单只股票财务指标数据"""
751
+ max_retries = 5
752
+ base_delay = 1.0
753
  for attempt in range(max_retries):
754
  try:
755
  df = ak.stock_financial_analysis_indicator(symbol=code)
 
779
  available_cols = [c for c in cols if c in df.columns]
780
  return df[available_cols]
781
  except Exception:
782
+ if attempt > 0:
783
+ delay = base_delay * (2 ** attempt)
784
+ time.sleep(delay)
785
  return None
786
 
787
 
 
911
 
912
  def get_stock_dividend(code: str) -> Optional[pd.DataFrame]:
913
  """获取单只股票分红数据"""
914
+ max_retries = 5
915
+ base_delay = 1.0
916
  for attempt in range(max_retries):
917
  try:
918
  df = ak.stock_history_dividend(symbol=code)
 
934
  available_cols = [c for c in cols if c in df.columns]
935
  return df[available_cols]
936
  except Exception:
937
+ if attempt > 0:
938
+ delay = base_delay * (2 ** attempt)
939
+ time.sleep(delay)
940
  return None
941
 
942