superxuu commited on
Commit
f908ead
·
1 Parent(s): fa90a2e

fix: Update sync script with latest AkShare interfaces and North Exchange support

Browse files
Files changed (1) hide show
  1. backend/scripts/sync_data.py +54 -70
backend/scripts/sync_data.py CHANGED
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
30
  # 配置
31
  YEARS_OF_DATA = 10 # 初始获取近10年数据
32
  MAX_WORKERS = 5 # 并发线程数
33
- SYNC_LIMIT = -1 # 同步数量限制 (设置为 -1 表示全量同步)
34
 
35
 
36
  def get_stock_list() -> pd.DataFrame:
@@ -42,66 +42,62 @@ def get_stock_list() -> pd.DataFrame:
42
 
43
  all_lists = []
44
 
 
45
  try:
46
- # 1. A 股列表
47
- try:
48
- df_a = ak.stock_zh_a_spot_em()[['代码', '名称']]
49
- df_a.columns = ['code', 'name']
50
- df_a['market'] = df_a['code'].apply(lambda x: '主板' if x.startswith(('60', '00')) else ('创业板' if x.startswith('30') else ('科创板' if x.startswith('68') else '其他')))
51
- all_lists.append(df_a)
52
- except Exception as e:
53
- logger.warning(f"Failed to fetch A-share list from AkShare: {e}")
54
-
55
- # 2. ETF 列表
56
- try:
57
- df_etf = ak.fund_etf_category_sina(symbol="ETF基金")[['代码', '名称']]
58
- df_etf.columns = ['code', 'name']
59
- df_etf['market'] = 'ETF'
60
- all_lists.append(df_etf)
61
- except Exception as e:
62
- logger.warning(f"Failed to fetch ETF list from AkShare: {e}")
63
 
64
- # 3. LOF 列表
65
- try:
66
- df_lof = ak.fund_etf_category_sina(symbol="LOF基金")[['代码', '名称']]
67
- df_lof.columns = ['code', 'name']
68
- df_lof['market'] = 'LOF'
69
- all_lists.append(df_lof)
70
- except Exception as e:
71
- logger.warning(f"Failed to fetch LOF list from AkShare: {e}")
72
 
73
- # 4. 可转债列表
74
- try:
75
- df_cb = ak.bond_zh_hs_cov_spot()[['代码', '名称']]
76
- df_cb.columns = ['code', 'name']
77
- df_cb['market'] = '可转债'
78
- all_lists.append(df_cb)
79
- except Exception as e:
80
- logger.warning(f"Failed to fetch Convertible Bond list from AkShare: {e}")
81
 
82
- # 5. REITs 列表
83
- try:
84
- df_reits = ak.stock_reits_spot_em()[['代码', '名称']]
85
- df_reits.columns = ['code', 'name']
86
- df_reits['market'] = 'REITs'
87
- all_lists.append(df_reits)
88
- except Exception as e:
89
- logger.warning(f"Failed to fetch REITs list from AkShare: {e}")
 
 
 
 
90
 
91
- except Exception as global_e:
92
- logger.error(f"Global error during list fetching: {global_e}")
 
 
 
 
 
 
93
 
94
  if not all_lists:
95
- logger.warning("AkShare list fetching failed completely. Attempting to recover from database...")
96
  try:
97
  db = get_db()
98
  df_db = db.conn.execute("SELECT code, name, market FROM stock_list").df()
99
- if not df_db.empty:
100
- logger.info(f"Successfully recovered {len(df_db)} targets from database")
101
- return df_db
102
- except Exception as db_e:
103
- logger.error(f"Failed to recover from database: {db_e}")
104
-
105
  raise Exception("Failed to fetch target list from both AkShare and Database")
106
 
107
  df = pd.concat(all_lists).drop_duplicates(subset=['code'])
@@ -145,7 +141,6 @@ def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.Dat
145
  df = df.rename(columns={'日期': 'trade_date', '开盘': 'open', '最高': 'high', '最低': 'low', '收盘': 'close', '成交量': 'volume', '成交额': 'amount', '涨跌幅': 'pct_chg', '换手率': 'turnover_rate'})
146
  df['trade_date'] = pd.to_datetime(df['trade_date'])
147
  else:
148
- # 股票接口 (后复权)
149
  df = ak.stock_zh_a_hist(symbol=code, period="daily", start_date=fetch_start, end_date=end_date, adjust="hfq")
150
  if df is not None and not df.empty:
151
  df = df.rename(columns={
@@ -160,8 +155,10 @@ def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.Dat
160
  # 2. 尝试 Yahoo Finance (作为兜底)
161
  if df is None or df.empty:
162
  # 转换代码格式
163
- if code.startswith('6') or code.startswith('9') or code.startswith('5') or code.startswith('11'):
164
  yf_code = f"{code}.SS"
 
 
165
  else:
166
  yf_code = f"{code}.SZ"
167
 
@@ -182,7 +179,6 @@ def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.Dat
182
  if df is None or df.empty: return None
183
 
184
  df['code'] = code
185
- # 确保列顺序一致
186
  cols = ['code', 'trade_date', 'open', 'high', 'low', 'close', 'volume', 'amount', 'pct_chg', 'turnover_rate']
187
  return df[[c for c in cols if c in df.columns]]
188
 
@@ -210,7 +206,6 @@ def sync_stock_daily(targets: List[Dict[str, str]]) -> int:
210
  db = get_db()
211
  success_count = 0
212
 
213
- # 获取数据库中已有的最新日期映射
214
  logger.info("Checking existing data for incremental sync...")
215
  try:
216
  existing_latest = db.conn.execute("SELECT code, MAX(trade_date) FROM stock_daily GROUP BY code").fetchall()
@@ -248,7 +243,6 @@ def sync_stock_daily(targets: List[Dict[str, str]]) -> int:
248
 
249
  if all_data:
250
  combined_df = pd.concat(all_data, ignore_index=True)
251
- # 确保列存在
252
  try:
253
  db.conn.execute("ALTER TABLE stock_daily ADD COLUMN turnover_rate DOUBLE")
254
  except: pass
@@ -274,30 +268,21 @@ def main():
274
 
275
  # 2. 同步日线数据
276
  limit = SYNC_LIMIT
277
-
278
- # 获取数据库中已有的标的代码 (排除指数)
279
  existing_codes = [row[0] for row in db.conn.execute("SELECT DISTINCT code FROM stock_daily WHERE code != '000300'").fetchall()]
280
 
281
  if limit > 0:
282
- # 将总表转为字典方便查找
283
  all_targets_dict = {t['code']: t for t in target_list.to_dict('records')}
284
-
285
  final_targets = []
286
- # A. 先把数据库里已有的标的加进来 (保证持续更新)
287
  for code in existing_codes:
288
  if code in all_targets_dict:
289
  final_targets.append(all_targets_dict[code])
290
- if len(final_targets) >= limit:
291
- break
292
 
293
- # B. 如果已有的不足 limit,从总表中取新的补齐
294
  if len(final_targets) < limit:
295
  for _, row in target_list.iterrows():
296
  if row['code'] not in existing_codes:
297
  final_targets.append(row.to_dict())
298
- if len(final_targets) >= limit:
299
- break
300
-
301
  targets = final_targets
302
  logger.info(f"Syncing {len(targets)} targets (Prioritized {len(existing_codes)} existing)")
303
  else:
@@ -316,12 +301,11 @@ def main():
316
  # 4. 上传
317
  db.upload_db()
318
 
319
- # 5. 刷新后端缓存 (如果是在应用内运行)
320
  try:
321
  from app.core import clear_eligible_stocks_cache
322
  clear_eligible_stocks_cache()
323
- except:
324
- pass
325
 
326
  logger.info("Data sync completed successfully!")
327
 
 
30
  # 配置
31
  YEARS_OF_DATA = 10 # 初始获取近10年数据
32
  MAX_WORKERS = 5 # 并发线程数
33
+ SYNC_LIMIT = 10 # 同步数量限制 (设置为 -1 表示全量同步)
34
 
35
 
36
  def get_stock_list() -> pd.DataFrame:
 
42
 
43
  all_lists = []
44
 
45
+ # 1. A 股列表
46
  try:
47
+ df_a = ak.stock_zh_a_spot_em()[['代码', '名称']]
48
+ df_a.columns = ['code', 'name']
49
+ df_a['market'] = df_a['code'].apply(lambda x: '主板' if x.startswith(('60', '00')) else ('创业板' if x.startswith('30') else ('科创板' if x.startswith('68') else ('北交所' if x.startswith(('8', '4', '920')) else '其他'))))
50
+ all_lists.append(df_a)
51
+ except Exception as e:
52
+ logger.warning(f"Failed to fetch A-share list: {e}")
 
 
 
 
 
 
 
 
 
 
 
53
 
54
+ # 2. ETF 列表
55
+ try:
56
+ df_etf = ak.fund_etf_category_sina(symbol="ETF基金")[['代码', '名称']]
57
+ df_etf.columns = ['code', 'name']
58
+ df_etf['market'] = 'ETF'
59
+ all_lists.append(df_etf)
60
+ except Exception as e:
61
+ logger.warning(f"Failed to fetch ETF list: {e}")
62
 
63
+ # 3. LOF 列表
64
+ try:
65
+ df_lof = ak.fund_etf_category_sina(symbol="LOF基金")[['代码', '名称']]
66
+ df_lof.columns = ['code', 'name']
67
+ df_lof['market'] = 'LOF'
68
+ all_lists.append(df_lof)
69
+ except Exception as e:
70
+ logger.warning(f"Failed to fetch LOF list: {e}")
71
 
72
+ # 4. 可转债列表
73
+ try:
74
+ df_cb = ak.bond_zh_hs_cov_spot()
75
+ if '代码' in df_cb.columns:
76
+ df_cb = df_cb[['代码', '名称']]
77
+ elif 'symbol' in df_cb.columns:
78
+ df_cb = df_cb[['symbol', 'name']]
79
+ df_cb.columns = ['code', 'name']
80
+ df_cb['market'] = '可转债'
81
+ all_lists.append(df_cb)
82
+ except Exception as e:
83
+ logger.warning(f"Failed to fetch Convertible Bond list: {e}")
84
 
85
+ # 5. REITs 列表
86
+ try:
87
+ df_reits = ak.reit_real_time_em()[['代码', '名称']]
88
+ df_reits.columns = ['code', 'name']
89
+ df_reits['market'] = 'REITs'
90
+ all_lists.append(df_reits)
91
+ except Exception as e:
92
+ logger.warning(f"Failed to fetch REITs list: {e}")
93
 
94
  if not all_lists:
95
+ logger.warning("AkShare list fetching failed. Attempting to recover from database...")
96
  try:
97
  db = get_db()
98
  df_db = db.conn.execute("SELECT code, name, market FROM stock_list").df()
99
+ if not df_db.empty: return df_db
100
+ except: pass
 
 
 
 
101
  raise Exception("Failed to fetch target list from both AkShare and Database")
102
 
103
  df = pd.concat(all_lists).drop_duplicates(subset=['code'])
 
141
  df = df.rename(columns={'日期': 'trade_date', '开盘': 'open', '最高': 'high', '最低': 'low', '收盘': 'close', '成交量': 'volume', '成交额': 'amount', '涨跌幅': 'pct_chg', '换手率': 'turnover_rate'})
142
  df['trade_date'] = pd.to_datetime(df['trade_date'])
143
  else:
 
144
  df = ak.stock_zh_a_hist(symbol=code, period="daily", start_date=fetch_start, end_date=end_date, adjust="hfq")
145
  if df is not None and not df.empty:
146
  df = df.rename(columns={
 
155
  # 2. 尝试 Yahoo Finance (作为兜底)
156
  if df is None or df.empty:
157
  # 转换代码格式
158
+ if code.startswith(('6', '9', '5', '11')):
159
  yf_code = f"{code}.SS"
160
+ elif code.startswith(('8', '4', '920')):
161
+ yf_code = f"{code}.BJ" # 北交所
162
  else:
163
  yf_code = f"{code}.SZ"
164
 
 
179
  if df is None or df.empty: return None
180
 
181
  df['code'] = code
 
182
  cols = ['code', 'trade_date', 'open', 'high', 'low', 'close', 'volume', 'amount', 'pct_chg', 'turnover_rate']
183
  return df[[c for c in cols if c in df.columns]]
184
 
 
206
  db = get_db()
207
  success_count = 0
208
 
 
209
  logger.info("Checking existing data for incremental sync...")
210
  try:
211
  existing_latest = db.conn.execute("SELECT code, MAX(trade_date) FROM stock_daily GROUP BY code").fetchall()
 
243
 
244
  if all_data:
245
  combined_df = pd.concat(all_data, ignore_index=True)
 
246
  try:
247
  db.conn.execute("ALTER TABLE stock_daily ADD COLUMN turnover_rate DOUBLE")
248
  except: pass
 
268
 
269
  # 2. 同步日线数据
270
  limit = SYNC_LIMIT
 
 
271
  existing_codes = [row[0] for row in db.conn.execute("SELECT DISTINCT code FROM stock_daily WHERE code != '000300'").fetchall()]
272
 
273
  if limit > 0:
 
274
  all_targets_dict = {t['code']: t for t in target_list.to_dict('records')}
 
275
  final_targets = []
 
276
  for code in existing_codes:
277
  if code in all_targets_dict:
278
  final_targets.append(all_targets_dict[code])
279
+ if len(final_targets) >= limit: break
 
280
 
 
281
  if len(final_targets) < limit:
282
  for _, row in target_list.iterrows():
283
  if row['code'] not in existing_codes:
284
  final_targets.append(row.to_dict())
285
+ if len(final_targets) >= limit: break
 
 
286
  targets = final_targets
287
  logger.info(f"Syncing {len(targets)} targets (Prioritized {len(existing_codes)} existing)")
288
  else:
 
301
  # 4. 上传
302
  db.upload_db()
303
 
304
+ # 5. 刷新后端缓存
305
  try:
306
  from app.core import clear_eligible_stocks_cache
307
  clear_eligible_stocks_cache()
308
+ except: pass
 
309
 
310
  logger.info("Data sync completed successfully!")
311