fengwm commited on
Commit
72f5e10
·
1 Parent(s): 2a8a0f5

fix: add retries and timeout for akshare data fetch

Browse files
Files changed (1) hide show
  1. data_fetcher.py +37 -7
data_fetcher.py CHANGED
@@ -1,5 +1,7 @@
1
  from datetime import datetime, timedelta
 
2
  import threading
 
3
 
4
  import akshare as ak
5
  import pandas as pd
@@ -8,6 +10,9 @@ _TRADE_CALENDAR_CACHE: pd.DatetimeIndex | None = None
8
  _TRADE_CALENDAR_CACHED_AT: datetime | None = None
9
  _TRADE_CALENDAR_CACHE_TTL = timedelta(hours=12)
10
  _TRADE_CALENDAR_LOCK = threading.Lock()
 
 
 
11
 
12
 
13
  def _normalize_symbol(raw_symbol: str) -> str:
@@ -31,6 +36,24 @@ def _normalize_symbol(raw_symbol: str) -> str:
31
  return symbol
32
 
33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
  def _get_trade_calendar_cached() -> pd.DatetimeIndex:
35
  """
36
  Fetch and cache exchange trading dates in-process to avoid repeated
@@ -47,7 +70,10 @@ def _get_trade_calendar_cached() -> pd.DatetimeIndex:
47
  ):
48
  return _TRADE_CALENDAR_CACHE
49
 
50
- cal = ak.tool_trade_date_hist_sina()
 
 
 
51
  cal_col = "trade_date" if "trade_date" in cal.columns else "日期"
52
  all_dates = pd.to_datetime(cal[cal_col]).sort_values().drop_duplicates()
53
  cached = pd.DatetimeIndex(all_dates)
@@ -73,12 +99,16 @@ def fetch_stock_data(
73
  # 4x buffer to account for weekends/long holidays.
74
  start_date = (datetime.today() - timedelta(days=lookback * 4)).strftime("%Y%m%d")
75
 
76
- df = ak.stock_zh_a_hist(
77
- symbol=normalized_symbol,
78
- period="daily",
79
- start_date=start_date,
80
- end_date=end_date,
81
- adjust="qfq",
 
 
 
 
82
  )
83
 
84
  if df is None or df.empty:
 
1
  from datetime import datetime, timedelta
2
+ import os
3
  import threading
4
+ import time
5
 
6
  import akshare as ak
7
  import pandas as pd
 
10
  _TRADE_CALENDAR_CACHED_AT: datetime | None = None
11
  _TRADE_CALENDAR_CACHE_TTL = timedelta(hours=12)
12
  _TRADE_CALENDAR_LOCK = threading.Lock()
13
+ _AK_RETRY_COUNT = max(1, int(os.environ.get("AK_RETRY_COUNT", "3")))
14
+ _AK_RETRY_BASE_SLEEP = float(os.environ.get("AK_RETRY_BASE_SLEEP", "0.8"))
15
+ _AK_TIMEOUT = float(os.environ.get("AK_TIMEOUT", "15"))
16
 
17
 
18
  def _normalize_symbol(raw_symbol: str) -> str:
 
36
  return symbol
37
 
38
 
39
+ def _retry_ak_call(fn, *, call_name: str):
40
+ """
41
+ Retry wrapper for AkShare calls to handle transient network disconnects.
42
+ """
43
+ last_exc: Exception | None = None
44
+ for attempt in range(1, _AK_RETRY_COUNT + 1):
45
+ try:
46
+ return fn()
47
+ except Exception as exc: # pragma: no cover - external IO
48
+ last_exc = exc
49
+ if attempt >= _AK_RETRY_COUNT:
50
+ break
51
+ time.sleep(_AK_RETRY_BASE_SLEEP * (2 ** (attempt - 1)))
52
+ raise RuntimeError(
53
+ f"AkShare call failed after {_AK_RETRY_COUNT} attempts ({call_name}): {last_exc}"
54
+ ) from last_exc
55
+
56
+
57
  def _get_trade_calendar_cached() -> pd.DatetimeIndex:
58
  """
59
  Fetch and cache exchange trading dates in-process to avoid repeated
 
70
  ):
71
  return _TRADE_CALENDAR_CACHE
72
 
73
+ cal = _retry_ak_call(
74
+ lambda: ak.tool_trade_date_hist_sina(),
75
+ call_name="tool_trade_date_hist_sina",
76
+ )
77
  cal_col = "trade_date" if "trade_date" in cal.columns else "日期"
78
  all_dates = pd.to_datetime(cal[cal_col]).sort_values().drop_duplicates()
79
  cached = pd.DatetimeIndex(all_dates)
 
99
  # 4x buffer to account for weekends/long holidays.
100
  start_date = (datetime.today() - timedelta(days=lookback * 4)).strftime("%Y%m%d")
101
 
102
+ df = _retry_ak_call(
103
+ lambda: ak.stock_zh_a_hist(
104
+ symbol=normalized_symbol,
105
+ period="daily",
106
+ start_date=start_date,
107
+ end_date=end_date,
108
+ adjust="qfq",
109
+ timeout=_AK_TIMEOUT,
110
+ ),
111
+ call_name=f"stock_zh_a_hist(symbol={normalized_symbol})",
112
  )
113
 
114
  if df is None or df.empty: