superxuu commited on
Commit
b39d11d
·
1 Parent(s): db4cda7

修复同步稳定性与远程视图冲突

Browse files
backend/app/database.py CHANGED
@@ -102,6 +102,8 @@ class DatabaseManager:
102
  local_paths.append(f"'{path}'")
103
 
104
  files_sql = ", ".join(local_paths)
 
 
105
  conn.execute(f"CREATE OR REPLACE VIEW stock_daily AS SELECT * FROM read_parquet([{files_sql}])")
106
  logger.info(f"Remote stock_daily view created with {len(parquet_files)} partitions")
107
  else:
 
102
  local_paths.append(f"'{path}'")
103
 
104
  files_sql = ", ".join(local_paths)
105
+ conn.execute("DROP VIEW IF EXISTS stock_daily")
106
+ conn.execute("DROP TABLE IF EXISTS stock_daily")
107
  conn.execute(f"CREATE OR REPLACE VIEW stock_daily AS SELECT * FROM read_parquet([{files_sql}])")
108
  logger.info(f"Remote stock_daily view created with {len(parquet_files)} partitions")
109
  else:
backend/scripts/sync_data.py CHANGED
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
30
 
31
  # 配置
32
  YEARS_OF_DATA = 10
33
- MAX_WORKERS = 10
34
  SYNC_LIMIT = -1
35
  USE_YAHOO_FALLBACK = 0
36
 
@@ -89,7 +89,7 @@ def get_stock_list() -> pd.DataFrame:
89
 
90
  def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.DataFrame]:
91
  """抓取单只标的数据"""
92
- max_retries = 2
93
  for attempt in range(max_retries):
94
  try:
95
  end_date = datetime.now().strftime('%Y%m%d')
@@ -102,7 +102,12 @@ def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.Dat
102
  elif market == 'LOF':
103
  df = ak.fund_lof_hist_em(symbol=code, period="daily", start_date=fetch_start, end_date=end_date, adjust="hfq")
104
  elif market == '可转债':
105
- df = ak.bond_zh_hs_cov_daily(symbol=code)
 
 
 
 
 
106
  elif market == 'REITs':
107
  # REITs 接口仅支持 symbol 参数,返回全量历史后再按日期过滤
108
  df = ak.reits_hist_em(symbol=code)
@@ -112,17 +117,22 @@ def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.Dat
112
  if df is not None and not df.empty:
113
  # 标准化列名
114
  rename_map = {
115
- '日期': 'trade_date', 'date': 'trade_date',
116
- '开盘': 'open', '今开': 'open',
117
- '最高': 'high',
118
- '最低': 'low',
119
- '收盘': 'close', '最新价': 'close',
120
- '成交量': 'volume',
121
- '成交额': 'amount',
122
  '涨跌幅': 'pct_chg',
123
  '换手率': 'turnover_rate', '换手': 'turnover_rate'
124
  }
125
  df = df.rename(columns=rename_map)
 
 
 
 
 
126
  df['trade_date'] = pd.to_datetime(df['trade_date'])
127
  df = df[df['trade_date'] >= pd.to_datetime(start_date)]
128
  if 'amount' not in df.columns: df['amount'] = 0
@@ -136,6 +146,62 @@ def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.Dat
136
  time.sleep(1)
137
  return None
138
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
139
  def sync_stock_daily(targets: List[Dict[str, str]], last_trade_day: str) -> int:
140
  """增量同步逻辑 - 兼容云端视图"""
141
  db = get_db()
@@ -216,12 +282,10 @@ def main():
216
  target_list.to_parquet(list_parquet)
217
 
218
  # 2. 行情同步
219
- from scripts.sync_data import get_last_trading_day
220
  last_day = get_last_trading_day()
221
  sync_stock_daily(target_list.to_dict('records'), last_day)
222
 
223
  # 3. 指数同步
224
- from scripts.sync_data import get_index_daily
225
  idx_df = get_index_daily('000300')
226
  if idx_df is not None:
227
  idx_path = Path("backend/data/parquet/index_000300.parquet")
 
30
 
31
  # 配置
32
  YEARS_OF_DATA = 10
33
+ MAX_WORKERS = 5 # 降低并发数,减少超时
34
  SYNC_LIMIT = -1
35
  USE_YAHOO_FALLBACK = 0
36
 
 
89
 
90
  def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.DataFrame]:
91
  """抓取单只标的数据"""
92
+ max_retries = 3 # 增加重试次数
93
  for attempt in range(max_retries):
94
  try:
95
  end_date = datetime.now().strftime('%Y%m%d')
 
102
  elif market == 'LOF':
103
  df = ak.fund_lof_hist_em(symbol=code, period="daily", start_date=fetch_start, end_date=end_date, adjust="hfq")
104
  elif market == '可转债':
105
+ # 可转债接口通常使用 6 位数字代码,兼容 sh110xxx / sz12xxxx / bj81xxxx
106
+ cov_symbol = code[-6:] if len(code) > 6 else code
107
+ try:
108
+ df = ak.bond_zh_hs_cov_daily(symbol=cov_symbol)
109
+ except Exception:
110
+ df = ak.bond_zh_hs_cov_daily(symbol=code)
111
  elif market == 'REITs':
112
  # REITs 接口仅支持 symbol 参数,返回全量历史后再按日期过滤
113
  df = ak.reits_hist_em(symbol=code)
 
117
  if df is not None and not df.empty:
118
  # 标准化列名
119
  rename_map = {
120
+ '日期': 'trade_date', 'date': 'trade_date', 'Date': 'trade_date',
121
+ '开盘': 'open', '今开': 'open', 'Open': 'open',
122
+ '最高': 'high', 'High': 'high',
123
+ '最低': 'low', 'Low': 'low',
124
+ '收盘': 'close', '最新价': 'close', 'Close': 'close',
125
+ '成交量': 'volume', 'Volume': 'volume',
126
+ '成交额': 'amount', 'Amount': 'amount',
127
  '涨跌幅': 'pct_chg',
128
  '换手率': 'turnover_rate', '换手': 'turnover_rate'
129
  }
130
  df = df.rename(columns=rename_map)
131
+
132
+ # 如果还是没有 trade_date,尝试将索引转为列
133
+ if 'trade_date' not in df.columns:
134
+ df = df.reset_index().rename(columns={'index': 'trade_date', 'date': 'trade_date'})
135
+
136
  df['trade_date'] = pd.to_datetime(df['trade_date'])
137
  df = df[df['trade_date'] >= pd.to_datetime(start_date)]
138
  if 'amount' not in df.columns: df['amount'] = 0
 
146
  time.sleep(1)
147
  return None
148
 
149
+ def get_last_trading_day() -> str:
150
+ """获取最近一个交易日(用于增量截止日)"""
151
+ try:
152
+ df = ak.stock_zh_index_daily_em(symbol="sh000300")
153
+ if df is not None and not df.empty:
154
+ date_col = 'date' if 'date' in df.columns else ('日期' if '日期' in df.columns else None)
155
+ if date_col:
156
+ return pd.to_datetime(df[date_col].iloc[-1]).strftime('%Y-%m-%d')
157
+ except Exception as e:
158
+ logger.warning(f"Failed to get last trading day from index data: {e}")
159
+
160
+ # 回退:按工作日估算
161
+ d = datetime.now()
162
+ while d.weekday() >= 5: # 5=周六, 6=周日
163
+ d -= timedelta(days=1)
164
+ return d.strftime('%Y-%m-%d')
165
+
166
+
167
+ def get_index_daily(code: str) -> Optional[pd.DataFrame]:
168
+ """抓取指数日线(默认用于沪深300)"""
169
+ try:
170
+ symbol = f"sh{code}" if code.startswith('000') else f"sz{code}"
171
+ df = ak.stock_zh_index_daily_em(symbol=symbol)
172
+ if df is None or df.empty:
173
+ return None
174
+
175
+ rename_map = {
176
+ 'date': 'trade_date', '日期': 'trade_date',
177
+ 'open': 'open', '开盘': 'open',
178
+ 'high': 'high', '最高': 'high',
179
+ 'low': 'low', '最低': 'low',
180
+ 'close': 'close', '收盘': 'close',
181
+ 'volume': 'volume', '成交量': 'volume',
182
+ 'amount': 'amount', '成交额': 'amount',
183
+ 'pct_chg': 'pct_chg', '涨跌幅': 'pct_chg'
184
+ }
185
+ df = df.rename(columns=rename_map)
186
+ if 'trade_date' not in df.columns:
187
+ return None
188
+
189
+ df['trade_date'] = pd.to_datetime(df['trade_date'])
190
+ if 'amount' not in df.columns:
191
+ df['amount'] = 0
192
+ if 'pct_chg' not in df.columns:
193
+ df['pct_chg'] = df['close'].pct_change() * 100
194
+ if 'volume' not in df.columns:
195
+ df['volume'] = 0
196
+ df['turnover_rate'] = 0
197
+ df['code'] = code
198
+
199
+ return df[['code', 'trade_date', 'open', 'high', 'low', 'close', 'volume', 'amount', 'pct_chg', 'turnover_rate']]
200
+ except Exception as e:
201
+ logger.warning(f"Failed to fetch index {code}: {e}")
202
+ return None
203
+
204
+
205
  def sync_stock_daily(targets: List[Dict[str, str]], last_trade_day: str) -> int:
206
  """增量同步逻辑 - 兼容云端视图"""
207
  db = get_db()
 
282
  target_list.to_parquet(list_parquet)
283
 
284
  # 2. 行情同步
 
285
  last_day = get_last_trading_day()
286
  sync_stock_daily(target_list.to_dict('records'), last_day)
287
 
288
  # 3. 指数同步
 
289
  idx_df = get_index_daily('000300')
290
  if idx_df is not None:
291
  idx_path = Path("backend/data/parquet/index_000300.parquet")