superxu520 commited on
Commit
aa468bc
·
1 Parent(s): c6a3487

feat:incremental-sync

Browse files
Files changed (5) hide show
  1. app/__init__.py +3 -1
  2. app/database.py +275 -29
  3. main.py +85 -90
  4. requirements.txt +0 -2
  5. sync_data.py +873 -55
app/__init__.py CHANGED
@@ -1 +1,3 @@
1
- # App package for sync space
 
 
 
1
+ """
2
+ App package for stock data sync
3
+ """
app/database.py CHANGED
@@ -62,63 +62,81 @@ class DatabaseManager:
62
  """
63
  conn = self.conn
64
 
65
- # 检查本地是否已有数据
66
  if not force_download:
67
  try:
68
- # 尝试查询现有数据
69
  count = conn.execute("SELECT COUNT(*) FROM stock_list").fetchone()[0]
70
  if count > 0:
71
- logger.info(f"Local data exists ({count} stocks). Skip downloading.")
 
 
72
  return
73
  except Exception:
74
- # 表不存在,需要下载
75
  pass
76
 
77
- # 从 HF Dataset 下载数据
 
 
 
 
 
 
 
 
 
 
 
 
 
78
  if HF_TOKEN and DATASET_REPO_ID:
79
  logger.info("Downloading remote Parquet files from HF Dataset...")
80
  try:
81
  from huggingface_hub import list_repo_files, hf_hub_download
82
 
83
- # 1. 动态获取文件列表
84
  all_files = list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset")
85
 
86
- # 2. 股票列表:下载并加载
87
- list_file = "data/stock_list.parquet"
88
- if list_file in all_files:
89
- local_list_path = hf_hub_download(repo_id=DATASET_REPO_ID, filename=list_file, repo_type="dataset")
90
- conn.execute(f"CREATE OR REPLACE TABLE stock_list AS SELECT * FROM read_parquet('{local_list_path}')")
91
- logger.info("Local stock_list table created from remote parquet")
 
92
 
93
- # 3. 日线数据:下载并加载
94
  parquet_files = [f for f in all_files if f.startswith("data/parquet/") and f.endswith(".parquet")]
95
-
96
  if parquet_files:
97
- local_paths = []
98
  for f in parquet_files:
99
- # hf_hub_download 会自动使用缓存,不会重复下载
100
- path = hf_hub_download(repo_id=DATASET_REPO_ID, filename=f, repo_type="dataset")
101
- local_paths.append(f"'{path}'")
 
 
102
 
103
- files_sql = ", ".join(local_paths)
104
- conn.execute("DROP VIEW IF EXISTS stock_daily")
105
- conn.execute("DROP TABLE IF EXISTS stock_daily")
106
- conn.execute(f"CREATE OR REPLACE VIEW stock_daily AS SELECT * FROM read_parquet([{files_sql}])")
107
- logger.info(f"Remote stock_daily view created with {len(parquet_files)} partitions")
108
  else:
109
- logger.warning("No parquet data files found in dataset")
110
  self._create_tables()
111
 
112
- # 验证
113
- count = conn.execute("SELECT COUNT(*) FROM stock_list").fetchone()[0]
114
- logger.info(f"Verification successful: {count} stocks found")
115
-
116
  except Exception as e:
117
  logger.error(f"Failed to load remote Parquet: {e}")
118
  self._create_tables()
119
  else:
120
  self._create_tables()
121
  logger.info("Local database initialized")
 
 
 
 
 
 
 
 
 
 
 
 
 
122
 
123
  def upload_db(self) -> None:
124
  """上传 Parquet 分区到 Hugging Face Dataset"""
@@ -156,6 +174,94 @@ class DatabaseManager:
156
  repo_type="dataset",
157
  )
158
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
  logger.info(f"Parquet files uploaded to HF Dataset: {DATASET_REPO_ID}")
160
  except Exception as e:
161
  logger.error(f"Failed to upload to HF: {e}")
@@ -166,7 +272,7 @@ class DatabaseManager:
166
  """创建数据库表结构"""
167
  conn = self.conn
168
 
169
- # 日线行情表
170
  conn.execute("""
171
  CREATE TABLE IF NOT EXISTS stock_daily (
172
  code VARCHAR,
@@ -193,11 +299,151 @@ class DatabaseManager:
193
  )
194
  """)
195
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
196
  # 创建索引
197
  conn.execute("""
198
  CREATE INDEX IF NOT EXISTS idx_code_date
199
  ON stock_daily (code, trade_date)
200
  """)
 
 
 
 
 
 
 
 
 
 
 
 
201
 
202
  logger.info("Database tables created/verified")
203
 
 
62
  """
63
  conn = self.conn
64
 
65
+ # 1. 检查本地是否已有数据
66
  if not force_download:
67
  try:
 
68
  count = conn.execute("SELECT COUNT(*) FROM stock_list").fetchone()[0]
69
  if count > 0:
70
+ logger.info(f"Local database tables exist ({count} stocks).")
71
+ # 即使表存在,也要确保视图被创建(如果本地有 parquet 文件)
72
+ self._refresh_views()
73
  return
74
  except Exception:
 
75
  pass
76
 
77
+ # 2. 尝试本地 Parquet 文件恢复(Space 没重启的情况)
78
+ parquet_dir = Path(os.path.dirname(DUCKDB_PATH)) / "parquet"
79
+ list_file = Path(os.path.dirname(DUCKDB_PATH)) / "stock_list.parquet"
80
+
81
+ if not force_download and list_file.exists():
82
+ try:
83
+ conn.execute(f"CREATE OR REPLACE TABLE stock_list AS SELECT * FROM read_parquet('{list_file}')")
84
+ self._refresh_views()
85
+ logger.info("Database restored from local parquet files.")
86
+ return
87
+ except Exception as e:
88
+ logger.warning(f"Failed to restore from local parquet: {e}")
89
+
90
+ # 3. 从 HF Dataset 下载数据(Space 重启后的情况)
91
  if HF_TOKEN and DATASET_REPO_ID:
92
  logger.info("Downloading remote Parquet files from HF Dataset...")
93
  try:
94
  from huggingface_hub import list_repo_files, hf_hub_download
95
 
 
96
  all_files = list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset")
97
 
98
+ # 下载股票列表
99
+ if "data/stock_list.parquet" in all_files:
100
+ local_list_path = hf_hub_download(repo_id=DATASET_REPO_ID, filename="data/stock_list.parquet", repo_type="dataset")
101
+ # 拷贝到本地数据目录
102
+ import shutil
103
+ shutil.copy(local_list_path, list_file)
104
+ conn.execute(f"CREATE OR REPLACE TABLE stock_list AS SELECT * FROM read_parquet('{list_file}')")
105
 
106
+ # 下载日线数据分区
107
  parquet_files = [f for f in all_files if f.startswith("data/parquet/") and f.endswith(".parquet")]
 
108
  if parquet_files:
 
109
  for f in parquet_files:
110
+ remote_path = hf_hub_download(repo_id=DATASET_REPO_ID, filename=f, repo_type="dataset")
111
+ dest_path = Path(os.path.dirname(DUCKDB_PATH)) / f.replace("data/", "")
112
+ dest_path.parent.mkdir(parents=True, exist_ok=True)
113
+ import shutil
114
+ shutil.copy(remote_path, dest_path)
115
 
116
+ self._refresh_views()
117
+ logger.info(f"Remote data downloaded and views created.")
 
 
 
118
  else:
 
119
  self._create_tables()
120
 
 
 
 
 
121
  except Exception as e:
122
  logger.error(f"Failed to load remote Parquet: {e}")
123
  self._create_tables()
124
  else:
125
  self._create_tables()
126
  logger.info("Local database initialized")
127
+
128
+ def _refresh_views(self) -> None:
129
+ """刷新数据库视图"""
130
+ conn = self.conn
131
+ parquet_dir = Path(os.path.dirname(DUCKDB_PATH)) / "parquet"
132
+
133
+ if parquet_dir.exists():
134
+ p_files = list(parquet_dir.glob("*.parquet"))
135
+ if p_files:
136
+ files_sql = ", ".join([f"'{str(f)}'" for f in p_files])
137
+ conn.execute("DROP VIEW IF EXISTS stock_daily")
138
+ conn.execute(f"CREATE OR REPLACE VIEW stock_daily AS SELECT * FROM read_parquet([{files_sql}])")
139
+ logger.info(f"Database views refreshed with {len(p_files)} partitions")
140
 
141
  def upload_db(self) -> None:
142
  """上传 Parquet 分区到 Hugging Face Dataset"""
 
174
  repo_type="dataset",
175
  )
176
 
177
+ # 3. 上传资金流向数据
178
+ fund_flow_path = Path(os.path.dirname(DUCKDB_PATH)) / "fund_flow.parquet"
179
+ if fund_flow_path.exists():
180
+ upload_file(
181
+ path_or_fileobj=str(fund_flow_path),
182
+ path_in_repo="data/fund_flow.parquet",
183
+ repo_id=DATASET_REPO_ID,
184
+ repo_type="dataset",
185
+ )
186
+ logger.info("Fund flow data uploaded")
187
+
188
+ # 4. 上传估值指标数据
189
+ valuation_path = Path(os.path.dirname(DUCKDB_PATH)) / "valuation.parquet"
190
+ if valuation_path.exists():
191
+ upload_file(
192
+ path_or_fileobj=str(valuation_path),
193
+ path_in_repo="data/valuation.parquet",
194
+ repo_id=DATASET_REPO_ID,
195
+ repo_type="dataset",
196
+ )
197
+ logger.info("Valuation data uploaded")
198
+
199
+ # 5. 上传融资融券数据
200
+ margin_path = Path(os.path.dirname(DUCKDB_PATH)) / "margin.parquet"
201
+ if margin_path.exists():
202
+ upload_file(
203
+ path_or_fileobj=str(margin_path),
204
+ path_in_repo="data/margin.parquet",
205
+ repo_id=DATASET_REPO_ID,
206
+ repo_type="dataset",
207
+ )
208
+ logger.info("Margin data uploaded")
209
+
210
+ # 6. 上传财务指标数据
211
+ financial_path = Path(os.path.dirname(DUCKDB_PATH)) / "financial_indicator.parquet"
212
+ if financial_path.exists():
213
+ upload_file(
214
+ path_or_fileobj=str(financial_path),
215
+ path_in_repo="data/financial_indicator.parquet",
216
+ repo_id=DATASET_REPO_ID,
217
+ repo_type="dataset",
218
+ )
219
+ logger.info("Financial indicator data uploaded")
220
+
221
+ # 7. 上传股东户数数据
222
+ holder_path = Path(os.path.dirname(DUCKDB_PATH)) / "holder_num.parquet"
223
+ if holder_path.exists():
224
+ upload_file(
225
+ path_or_fileobj=str(holder_path),
226
+ path_in_repo="data/holder_num.parquet",
227
+ repo_id=DATASET_REPO_ID,
228
+ repo_type="dataset",
229
+ )
230
+ logger.info("Holder number data uploaded")
231
+
232
+ # 8. 上传分红数据
233
+ dividend_path = Path(os.path.dirname(DUCKDB_PATH)) / "dividend.parquet"
234
+ if dividend_path.exists():
235
+ upload_file(
236
+ path_or_fileobj=str(dividend_path),
237
+ path_in_repo="data/dividend.parquet",
238
+ repo_id=DATASET_REPO_ID,
239
+ repo_type="dataset",
240
+ )
241
+ logger.info("Dividend data uploaded")
242
+
243
+ # 9. 上传十大股东数据
244
+ top_holders_path = Path(os.path.dirname(DUCKDB_PATH)) / "top_holders.parquet"
245
+ if top_holders_path.exists():
246
+ upload_file(
247
+ path_or_fileobj=str(top_holders_path),
248
+ path_in_repo="data/top_holders.parquet",
249
+ repo_id=DATASET_REPO_ID,
250
+ repo_type="dataset",
251
+ )
252
+ logger.info("Top holders data uploaded")
253
+
254
+ # 10. 上传限售解禁数据
255
+ restricted_path = Path(os.path.dirname(DUCKDB_PATH)) / "restricted_unlock.parquet"
256
+ if restricted_path.exists():
257
+ upload_file(
258
+ path_or_fileobj=str(restricted_path),
259
+ path_in_repo="data/restricted_unlock.parquet",
260
+ repo_id=DATASET_REPO_ID,
261
+ repo_type="dataset",
262
+ )
263
+ logger.info("Restricted unlock data uploaded")
264
+
265
  logger.info(f"Parquet files uploaded to HF Dataset: {DATASET_REPO_ID}")
266
  except Exception as e:
267
  logger.error(f"Failed to upload to HF: {e}")
 
272
  """创建数据库表结构"""
273
  conn = self.conn
274
 
275
+ # 日线行情表(保持原有结构不变)
276
  conn.execute("""
277
  CREATE TABLE IF NOT EXISTS stock_daily (
278
  code VARCHAR,
 
299
  )
300
  """)
301
 
302
+ # 资金流向表
303
+ conn.execute("""
304
+ CREATE TABLE IF NOT EXISTS stock_fund_flow (
305
+ code VARCHAR,
306
+ trade_date DATE,
307
+ close DOUBLE,
308
+ pct_chg DOUBLE,
309
+ main_net_inflow DOUBLE,
310
+ main_net_inflow_pct DOUBLE,
311
+ huge_net_inflow DOUBLE,
312
+ huge_net_inflow_pct DOUBLE,
313
+ large_net_inflow DOUBLE,
314
+ large_net_inflow_pct DOUBLE,
315
+ medium_net_inflow DOUBLE,
316
+ medium_net_inflow_pct DOUBLE,
317
+ small_net_inflow DOUBLE,
318
+ small_net_inflow_pct DOUBLE,
319
+ PRIMARY KEY (code, trade_date)
320
+ )
321
+ """)
322
+
323
+ # 估值指标表
324
+ conn.execute("""
325
+ CREATE TABLE IF NOT EXISTS stock_valuation (
326
+ code VARCHAR,
327
+ trade_date DATE,
328
+ pe_ttm DOUBLE,
329
+ pe_static DOUBLE,
330
+ pb DOUBLE,
331
+ ps_ttm DOUBLE,
332
+ dv_ratio DOUBLE,
333
+ total_mv DOUBLE,
334
+ circ_mv DOUBLE,
335
+ PRIMARY KEY (code, trade_date)
336
+ )
337
+ """)
338
+
339
+ # 融资融券表
340
+ conn.execute("""
341
+ CREATE TABLE IF NOT EXISTS stock_margin (
342
+ code VARCHAR,
343
+ trade_date DATE,
344
+ rzye DOUBLE,
345
+ rzmre DOUBLE,
346
+ rzche DOUBLE,
347
+ rqye DOUBLE,
348
+ rqmcl DOUBLE,
349
+ rzrqye DOUBLE,
350
+ PRIMARY KEY (code, trade_date)
351
+ )
352
+ """)
353
+
354
+ # 财务指标表
355
+ conn.execute("""
356
+ CREATE TABLE IF NOT EXISTS stock_financial_indicator (
357
+ code VARCHAR,
358
+ trade_date DATE,
359
+ roe DOUBLE,
360
+ roa DOUBLE,
361
+ gross_margin DOUBLE,
362
+ net_margin DOUBLE,
363
+ debt_ratio DOUBLE,
364
+ current_ratio DOUBLE,
365
+ quick_ratio DOUBLE,
366
+ inventory_turnover DOUBLE,
367
+ receivable_turnover DOUBLE,
368
+ total_asset_turnover DOUBLE,
369
+ PRIMARY KEY (code, trade_date)
370
+ )
371
+ """)
372
+
373
+ # 股东户数表
374
+ conn.execute("""
375
+ CREATE TABLE IF NOT EXISTS stock_holder_num (
376
+ code VARCHAR,
377
+ trade_date DATE,
378
+ holder_num BIGINT,
379
+ avg_share DOUBLE,
380
+ avg_value DOUBLE,
381
+ total_share DOUBLE,
382
+ total_value DOUBLE,
383
+ PRIMARY KEY (code, trade_date)
384
+ )
385
+ """)
386
+
387
+ # 历史分红表
388
+ conn.execute("""
389
+ CREATE TABLE IF NOT EXISTS stock_dividend (
390
+ code VARCHAR,
391
+ trade_date DATE,
392
+ dividend_type VARCHAR,
393
+ dividend_amount DOUBLE,
394
+ record_date DATE,
395
+ ex_date DATE,
396
+ pay_date DATE,
397
+ PRIMARY KEY (code, trade_date, dividend_type)
398
+ )
399
+ """)
400
+
401
+ # 十大股东表
402
+ conn.execute("""
403
+ CREATE TABLE IF NOT EXISTS stock_top_holders (
404
+ code VARCHAR,
405
+ trade_date DATE,
406
+ holder_name VARCHAR,
407
+ holder_type VARCHAR,
408
+ hold_num DOUBLE,
409
+ hold_ratio DOUBLE,
410
+ hold_change DOUBLE,
411
+ hold_change_ratio DOUBLE,
412
+ PRIMARY KEY (code, trade_date, holder_name)
413
+ )
414
+ """)
415
+
416
+ # 限售解禁表
417
+ conn.execute("""
418
+ CREATE TABLE IF NOT EXISTS stock_restricted_unlock (
419
+ code VARCHAR,
420
+ trade_date DATE,
421
+ unlock_date DATE,
422
+ unlock_num DOUBLE,
423
+ unlock_value DOUBLE,
424
+ unlock_ratio DOUBLE,
425
+ lock_type VARCHAR,
426
+ PRIMARY KEY (code, unlock_date)
427
+ )
428
+ """)
429
+
430
  # 创建索引
431
  conn.execute("""
432
  CREATE INDEX IF NOT EXISTS idx_code_date
433
  ON stock_daily (code, trade_date)
434
  """)
435
+ conn.execute("""
436
+ CREATE INDEX IF NOT EXISTS idx_fund_flow_code_date
437
+ ON stock_fund_flow (code, trade_date)
438
+ """)
439
+ conn.execute("""
440
+ CREATE INDEX IF NOT EXISTS idx_valuation_code_date
441
+ ON stock_valuation (code, trade_date)
442
+ """)
443
+ conn.execute("""
444
+ CREATE INDEX IF NOT EXISTS idx_margin_code_date
445
+ ON stock_margin (code, trade_date)
446
+ """)
447
 
448
  logger.info("Database tables created/verified")
449
 
main.py CHANGED
@@ -1,6 +1,8 @@
1
  """
2
- Sync Space 启动脚本 - 纯 Python
3
- 替代 start_sync.sh,更简洁、跨平台
 
 
4
  """
5
 
6
  import os
@@ -9,8 +11,13 @@ import logging
9
  import schedule
10
  import time
11
  import subprocess
 
 
12
  from datetime import datetime
13
- from pathlib import Path
 
 
 
14
 
15
  # 配置日志
16
  logging.basicConfig(
@@ -19,138 +26,126 @@ logging.basicConfig(
19
  )
20
  logger = logging.getLogger(__name__)
21
 
 
22
 
23
- def check_environment():
24
- """检查必需的环境变量"""
25
- logger.info("=" * 60)
26
- logger.info(" Stock Data Sync Space")
27
- logger.info("=" * 60)
28
-
29
- # 检查 HF_TOKEN
30
- hf_token = os.getenv("HF_TOKEN")
31
- if not hf_token:
32
- logger.error("ERROR: HF_TOKEN not set!")
33
- logger.error("Please set HF_TOKEN in Space secrets")
34
- sys.exit(1)
35
-
36
- # 检查 DATASET_REPO_ID
37
- dataset_repo = os.getenv("DATASET_REPO_ID")
38
- if not dataset_repo:
39
- logger.error("ERROR: DATASET_REPO_ID not set!")
40
- logger.error("Please set DATASET_REPO_ID in Space secrets")
41
- sys.exit(1)
42
-
43
- logger.info(f"Dataset: {dataset_repo}")
44
- return hf_token, dataset_repo
45
 
 
46
 
47
  def run_sync():
48
- """执行数据同步"""
49
  logger.info("=" * 60)
50
- logger.info(f"Starting sync at {datetime.now()}")
51
  logger.info("=" * 60)
52
 
53
  try:
54
- # 运行同步脚本
55
  result = subprocess.run(
56
  [sys.executable, "-u", "sync_data.py"],
57
- cwd="/app",
58
  capture_output=False,
59
  text=True
60
  )
61
 
62
  if result.returncode == 0:
63
- logger.info("✅ Sync completed successfully!")
64
  else:
65
- logger.error(f"❌ Sync failed with return code {result.returncode}")
66
 
 
 
67
  except Exception as e:
68
- logger.error(f"❌ Sync error: {e}")
69
-
70
-
71
- def run_once():
72
- """一次性同步模式"""
73
- logger.info("Running one-time sync...")
74
- run_sync()
75
- logger.info("Sync completed! Space will stop.")
76
- sys.exit(0)
77
-
78
 
79
  def parse_sync_times() -> list:
80
- """
81
- 解析同步时间配置
82
- 支持两种格式:
83
- 1. 单个时间:SYNC_TIME=18:00
84
- 2. 多个时间:SYNC_TIME=08:00,12:00,18:00
85
- """
86
  sync_time_str = os.getenv("SYNC_TIME", "18:00")
87
-
88
- # 分割多个时间点
89
  times = [t.strip() for t in sync_time_str.split(",")]
90
-
91
- # 验证时间格式
92
  valid_times = []
93
  for t in times:
94
  try:
95
- # 验证格式 HH:MM
96
  hour, minute = t.split(":")
97
  if 0 <= int(hour) <= 23 and 0 <= int(minute) <= 59:
98
  valid_times.append(t)
99
- else:
100
- logger.warning(f"Invalid time format: {t}, skipping")
101
- except Exception as e:
102
- logger.warning(f"Invalid time format: {t}, skipping. Error: {e}")
103
-
104
- if not valid_times:
105
- logger.warning("No valid sync times found, using default: 18:00")
106
- return ["18:00"]
107
-
108
- return valid_times
109
-
110
 
111
- def run_schedule():
112
- """定时同步模式"""
 
113
  sync_times = parse_sync_times()
114
- sync_on_start = os.getenv("SYNC_ON_START", "false").lower() == "true"
115
 
116
- if len(sync_times) == 1:
117
- logger.info(f"Schedule mode: Daily sync at {sync_times[0]} (Beijing time)")
118
- else:
119
- logger.info(f"Schedule mode: Daily sync at {', '.join(sync_times)} (Beijing time)")
120
 
121
- # 设置多个定时任务
122
- for sync_time in sync_times:
123
- schedule.every().day.at(sync_time).do(run_sync)
124
- logger.info(f" - Scheduled sync at {sync_time}")
125
 
126
- # 启动时立即执行一次(可选)
127
  if sync_on_start:
128
  logger.info("Running initial sync on startup...")
129
  run_sync()
130
 
131
- logger.info(f"Scheduler started. Total {len(sync_times)} sync time(s) configured")
132
-
133
- # 保持运行
134
- while True:
135
  schedule.run_pending()
136
- time.sleep(60)
 
 
137
 
 
138
 
139
  def main():
140
- """主入口"""
141
- # 检查环境
142
- check_environment()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
 
144
- # 获取同步模式
145
- sync_mode = os.getenv("SYNC_MODE", "schedule")
146
- logger.info(f"Sync mode: {sync_mode}")
147
 
148
- # 根据模式运行
 
149
  if sync_mode == "once":
150
- run_once()
 
 
 
 
 
151
  else:
152
- run_schedule()
153
-
154
 
155
  if __name__ == "__main__":
156
  main()
 
1
  """
2
+ Sync Space 启动脚本 - 增强
3
+ 1. 启动轻量级 Web 服务以绕过 HF 健康检查
4
+ 2. 异步运行数据同步调度器
5
+ 3. 支持多时间点定时触发
6
  """
7
 
8
  import os
 
11
  import schedule
12
  import time
13
  import subprocess
14
+ import threading
15
+ import signal
16
  from datetime import datetime
17
+ from http.server import BaseHTTPRequestHandler, HTTPServer
18
+
19
+ # 全局退出标志
20
+ _shutdown_requested = False
21
 
22
  # 配置日志
23
  logging.basicConfig(
 
26
  )
27
  logger = logging.getLogger(__name__)
28
 
29
+ # ==================== 1. 健康检查 Web 服务 ====================
30
 
31
+ class HealthCheckHandler(BaseHTTPRequestHandler):
32
+ def do_GET(self):
33
+ self.send_response(200)
34
+ self.send_header('Content-type', 'text/html')
35
+ self.end_headers()
36
+ content = f"Stock Data Sync Space is running.<br>Last check: {datetime.now()}"
37
+ self.wfile.write(content.encode())
38
+
39
+ def log_message(self, format, *args):
40
+ # 禁用访问日志,保持控制台干净
41
+ return
42
+
43
+ def run_health_check_server(port=7860):
44
+ """启动 Web 服务响应 HF 健康检查"""
45
+ server_address = ('', port)
46
+ httpd = HTTPServer(server_address, HealthCheckHandler)
47
+ logger.info(f"Health check server started on port {port}")
48
+ httpd.serve_forever()
 
 
 
 
49
 
50
+ # ==================== 2. 同步逻辑与调度 ====================
51
 
52
  def run_sync():
53
+ """执行数据同步脚本"""
54
  logger.info("=" * 60)
55
+ logger.info(f"Starting sync task at {datetime.now()}")
56
  logger.info("=" * 60)
57
 
58
  try:
59
+ # 使用 -u 参数确保日志实时输出
60
  result = subprocess.run(
61
  [sys.executable, "-u", "sync_data.py"],
62
+ cwd=os.path.dirname(os.path.abspath(__file__)),
63
  capture_output=False,
64
  text=True
65
  )
66
 
67
  if result.returncode == 0:
68
+ logger.info("✅ Sync task completed successfully!")
69
  else:
70
+ logger.error(f"❌ Sync task failed with return code {result.returncode}")
71
 
72
+ except subprocess.CalledProcessError as e:
73
+ logger.error(f"❌ Sync task failed with return code {e.returncode}")
74
  except Exception as e:
75
+ logger.error(f"❌ Sync task error: {e}")
 
 
 
 
 
 
 
 
 
76
 
77
  def parse_sync_times() -> list:
78
+ """解析 SYNC_TIME 环境变量 (格式: 14:40,18:00)"""
 
 
 
 
 
79
  sync_time_str = os.getenv("SYNC_TIME", "18:00")
 
 
80
  times = [t.strip() for t in sync_time_str.split(",")]
 
 
81
  valid_times = []
82
  for t in times:
83
  try:
 
84
  hour, minute = t.split(":")
85
  if 0 <= int(hour) <= 23 and 0 <= int(minute) <= 59:
86
  valid_times.append(t)
87
+ except (ValueError, IndexError):
88
+ logger.warning(f"Invalid time format: {t}, skipping")
89
+ return valid_times if valid_times else ["18:00"]
 
 
 
 
 
 
 
 
90
 
91
+ def scheduler_loop():
92
+ """调度器主循环"""
93
+ global _shutdown_requested
94
  sync_times = parse_sync_times()
95
+ sync_on_start = os.getenv("SYNC_ON_START", "true").lower() == "true"
96
 
97
+ logger.info(f"Scheduler configured for times: {', '.join(sync_times)}")
 
 
 
98
 
99
+ for t in sync_times:
100
+ schedule.every().day.at(t).do(run_sync)
 
 
101
 
102
+ # 启动时立即执行一次
103
  if sync_on_start:
104
  logger.info("Running initial sync on startup...")
105
  run_sync()
106
 
107
+ while not _shutdown_requested:
 
 
 
108
  schedule.run_pending()
109
+ time.sleep(30)
110
+
111
+ logger.info("Scheduler loop exiting gracefully.")
112
 
113
+ # ==================== 3. 主入口 ====================
114
 
115
  def main():
116
+ # 1. 检查必需的环境变量
117
+ hf_token = os.getenv("HF_TOKEN")
118
+ dataset_repo = os.getenv("DATASET_REPO_ID")
119
+ if not hf_token or not dataset_repo:
120
+ logger.error("HF_TOKEN or DATASET_REPO_ID not set! Please check Space secrets.")
121
+ sys.exit(1)
122
+
123
+ # 2. 启动健康检查 Web 服务 (在后台线程)
124
+ port = int(os.getenv("PORT", 7860))
125
+ web_thread = threading.Thread(target=run_health_check_server, args=(port,), daemon=True)
126
+ web_thread.start()
127
+
128
+ # 3. 设置信号处理
129
+ def signal_handler(signum, frame):
130
+ global _shutdown_requested
131
+ logger.info(f"Received signal {signum}, shutting down gracefully...")
132
+ _shutdown_requested = True
133
 
134
+ signal.signal(signal.SIGTERM, signal_handler)
135
+ signal.signal(signal.SIGINT, signal_handler)
 
136
 
137
+ # 4. 运行调度器 (在主线程)
138
+ sync_mode = os.getenv("SYNC_MODE", "schedule")
139
  if sync_mode == "once":
140
+ logger.info("Mode: ONCE - Running sync and exiting.")
141
+ run_sync()
142
+ logger.info("Sync completed. Keeping container alive for health check.")
143
+ # 保持主线程运行,否则容器会退出
144
+ while not _shutdown_requested:
145
+ time.sleep(3600)
146
  else:
147
+ logger.info("Mode: SCHEDULE - Starting scheduler.")
148
+ scheduler_loop()
149
 
150
  if __name__ == "__main__":
151
  main()
requirements.txt CHANGED
@@ -4,7 +4,5 @@ pandas>=2.0.0
4
  duckdb>=0.9.0
5
  huggingface-hub>=0.20.0
6
  python-dotenv>=1.0.0
7
- yfinance>=0.2.0
8
- pytz>=2023.3
9
  schedule>=1.2.0 # 定时任务调度
10
  pyarrow>=14.0.0 # Parquet 文件支持
 
4
  duckdb>=0.9.0
5
  huggingface-hub>=0.20.0
6
  python-dotenv>=1.0.0
 
 
7
  schedule>=1.2.0 # 定时任务调度
8
  pyarrow>=14.0.0 # Parquet 文件支持
sync_data.py CHANGED
@@ -30,8 +30,63 @@ logger = logging.getLogger(__name__)
30
 
31
  # 配置
32
  YEARS_OF_DATA = 10
33
- MAX_WORKERS = 5 # 降低并发数,减少超时
34
- SYNC_LIMIT = -1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
 
36
  def get_stock_list() -> pd.DataFrame:
37
  """获取全市场标的列表"""
@@ -163,17 +218,30 @@ def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.Dat
163
  return None
164
 
165
  def get_last_trading_day() -> str:
166
- """获取最近一个交易日"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
167
  try:
168
  df = ak.stock_zh_index_daily_em(symbol="sh000300")
169
  if df is not None and not df.empty:
170
  date_col = 'date' if 'date' in df.columns else ('日期' if '日期' in df.columns else None)
171
  if date_col:
172
  return pd.to_datetime(df[date_col].iloc[-1]).strftime('%Y-%m-%d')
173
- except Exception as e:
174
- logger.warning(f"Failed to get last trading day: {e}")
175
 
176
- # 回退:按工作日估算
177
  d = get_beijing_time()
178
  while d.weekday() >= 5:
179
  d -= timedelta(days=1)
@@ -237,11 +305,17 @@ def sync_stock_daily(targets: List[Dict[str, str]], last_trade_day: str) -> int:
237
  t['start_dt'] = start_dt
238
  pending.append(t)
239
 
 
 
 
 
 
 
240
  if not pending: return 0
241
  logger.info(f"Syncing {len(pending)} targets...")
242
 
243
  all_new_data = []
244
- with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
245
  futures = {executor.submit(get_target_daily, t['code'], t['start_dt'], t['market']): t['code'] for t in pending}
246
  for i, future in enumerate(as_completed(futures), 1):
247
  res = future.result()
@@ -259,62 +333,806 @@ def sync_stock_daily(targets: List[Dict[str, str]], last_trade_day: str) -> int:
259
  local_path = Path(f"/tmp/data/parquet/{filename}") # Sync Space 使用 /tmp
260
  local_path.parent.mkdir(parents=True, exist_ok=True)
261
 
262
- # 云端增量核心:先从云端拉取旧的月份文件
263
- repo_id = os.getenv("DATASET_REPO_ID")
264
- if repo_id:
265
  try:
266
- old_file = hf_hub_download(repo_id=repo_id, filename=f"data/parquet/{filename}", repo_type="dataset")
267
- old_df = pd.read_parquet(old_file)
268
- # 合并
269
- month_inc = inc_df[(inc_df['trade_date'].dt.year == yr) & (inc_df['trade_date'].dt.month == mo)]
270
- final_month_df = pd.concat([old_df, month_inc]).drop_duplicates(subset=['code', 'trade_date'])
271
- final_month_df.to_parquet(local_path)
272
- logger.info(f"Merged cloud data for {filename}")
273
- continue
274
- except: pass
275
-
276
- # 如果云端没有,直接保存
 
 
 
 
277
  month_inc = inc_df[(inc_df['trade_date'].dt.year == yr) & (inc_df['trade_date'].dt.month == mo)]
278
- month_inc.to_parquet(local_path)
279
- logger.info(f"Saved new data for {filename}")
 
 
 
 
 
280
 
281
  return len(all_new_data)
282
 
283
- def main():
284
- logger.info("=" * 60)
285
- logger.info("Stock Data Sync Started")
286
- logger.info("=" * 60)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
287
 
288
- db = get_db()
289
- db.init_db()
290
-
291
- # 1. 列表同步
292
- target_list = get_stock_list()
293
- list_parquet = Path("/tmp/data/stock_list.parquet")
294
- list_parquet.parent.mkdir(parents=True, exist_ok=True)
295
- target_list.to_parquet(list_parquet)
296
-
297
- # 2. 行情同步
298
- last_day = get_last_trading_day()
299
- logger.info(f"Last trading day: {last_day}")
300
- sync_count = sync_stock_daily(target_list.to_dict('records'), last_day)
301
- logger.info(f"Synced {sync_count} stock-day records")
302
-
303
- # 3. 指数同步
304
- idx_df = get_index_daily('000300')
305
- if idx_df is not None:
306
- idx_path = Path("/tmp/data/parquet/index_000300.parquet")
307
- idx_path.parent.mkdir(parents=True, exist_ok=True)
308
- idx_df.to_parquet(idx_path)
309
- logger.info("Index data synced")
310
-
311
- # 4. 上传
312
- logger.info("Uploading to Hugging Face Dataset...")
313
- db.upload_db()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
314
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
315
  logger.info("=" * 60)
316
- logger.info("Sync Completed Successfully!")
317
  logger.info("=" * 60)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
318
 
319
  if __name__ == "__main__":
320
- main()
 
30
 
31
  # 配置
32
  YEARS_OF_DATA = 10
33
+
34
+ def _safe_int_env(var_name: str, default: int) -> int:
35
+ """安全地读取环境变量并转换为整数"""
36
+ try:
37
+ value = os.getenv(var_name)
38
+ if value is None:
39
+ return default
40
+ return int(value)
41
+ except (ValueError, TypeError):
42
+ logger.warning(f"Invalid value for {var_name}, using default: {default}")
43
+ return default
44
+
45
+ # 动态线程数配置(延迟计算,避免导入时触发 multiprocessing)
46
+ def get_thread_config():
47
+ """获取线程池配置(延迟计算)"""
48
+ import multiprocessing
49
+ cpu_count = multiprocessing.cpu_count()
50
+
51
+ # 分层并发策略
52
+ config = {
53
+ 'daily': _safe_int_env("MAX_WORKERS_DAILY", cpu_count * 4),
54
+ 'fund': _safe_int_env("MAX_WORKERS_FUND", cpu_count * 3),
55
+ 'valuation': _safe_int_env("MAX_WORKERS_VALUATION", cpu_count * 3),
56
+ 'margin': _safe_int_env("MAX_WORKERS_MARGIN", cpu_count * 3),
57
+ 'financial': _safe_int_env("MAX_WORKERS_FINANCIAL", cpu_count * 2),
58
+ 'dividend': _safe_int_env("MAX_WORKERS_DIVIDEND", cpu_count * 2),
59
+ }
60
+
61
+ # 向后兼容
62
+ legacy = _safe_int_env("MAX_WORKERS", 0)
63
+ if legacy > 0:
64
+ config = {k: legacy for k in config}
65
+
66
+ return cpu_count, config
67
+
68
+ # 延迟初始化线程配置(在 main 中调用)
69
+ _CPU_COUNT = None
70
+ _THREAD_CONFIG = None
71
+ _thread_config_lock = threading.Lock()
72
+
73
+ def get_thread_config_safe():
74
+ """安全获取线程配置(自动初始化,线程安全)"""
75
+ global _CPU_COUNT, _THREAD_CONFIG
76
+ if _THREAD_CONFIG is None:
77
+ with _thread_config_lock:
78
+ # 双重检查锁定模式
79
+ if _THREAD_CONFIG is None:
80
+ _CPU_COUNT, _THREAD_CONFIG = get_thread_config()
81
+ logger.info(f"Thread pool config: CPU={_CPU_COUNT}, "
82
+ f"Daily={_THREAD_CONFIG['daily']}, Fund={_THREAD_CONFIG['fund']}, "
83
+ f"Valuation={_THREAD_CONFIG['valuation']}, Margin={_THREAD_CONFIG['margin']}, "
84
+ f"Financial={_THREAD_CONFIG['financial']}, Dividend={_THREAD_CONFIG['dividend']}")
85
+ return _THREAD_CONFIG
86
+
87
+ def init_thread_config():
88
+ """初始化线程配置(在 main 中调用)"""
89
+ get_thread_config_safe()
90
 
91
  def get_stock_list() -> pd.DataFrame:
92
  """获取全市场标的列表"""
 
218
  return None
219
 
220
  def get_last_trading_day() -> str:
221
+ """获取最近一个交易日(优先使用交易日历)"""
222
+ try:
223
+ # 获取交易日历(新浪接口,包含未来日期)
224
+ df = ak.tool_trade_date_hist_sina()
225
+ if df is not None and not df.empty:
226
+ # 转换为日期格式并过滤出 <= 今天的日期
227
+ df['trade_date'] = pd.to_datetime(df['trade_date']).dt.date
228
+ today = get_beijing_time().date()
229
+ # 找到最后一个交易日
230
+ last_day = df[df['trade_date'] <= today]['trade_date'].iloc[-1]
231
+ return last_day.strftime('%Y-%m-%d')
232
+ except Exception as e:
233
+ logger.warning(f"Failed to get trading calendar: {e}")
234
+
235
+ # 回退:使用指数行情
236
  try:
237
  df = ak.stock_zh_index_daily_em(symbol="sh000300")
238
  if df is not None and not df.empty:
239
  date_col = 'date' if 'date' in df.columns else ('日期' if '日期' in df.columns else None)
240
  if date_col:
241
  return pd.to_datetime(df[date_col].iloc[-1]).strftime('%Y-%m-%d')
242
+ except Exception: pass
 
243
 
244
+ # 最终回退:按工作日估算
245
  d = get_beijing_time()
246
  while d.weekday() >= 5:
247
  d -= timedelta(days=1)
 
305
  t['start_dt'] = start_dt
306
  pending.append(t)
307
 
308
+ # 应用 SYNC_LIMIT 限制
309
+ sync_limit = int(os.getenv("SYNC_LIMIT", -1))
310
+ if sync_limit > 0 and len(pending) > sync_limit:
311
+ logger.info(f"Limiting sync to first {sync_limit} targets (out of {len(pending)})")
312
+ pending = pending[:sync_limit]
313
+
314
  if not pending: return 0
315
  logger.info(f"Syncing {len(pending)} targets...")
316
 
317
  all_new_data = []
318
+ with ThreadPoolExecutor(max_workers=get_thread_config_safe()['daily']) as executor:
319
  futures = {executor.submit(get_target_daily, t['code'], t['start_dt'], t['market']): t['code'] for t in pending}
320
  for i, future in enumerate(as_completed(futures), 1):
321
  res = future.result()
 
333
  local_path = Path(f"/tmp/data/parquet/{filename}") # Sync Space 使用 /tmp
334
  local_path.parent.mkdir(parents=True, exist_ok=True)
335
 
336
+ # 增量核心:先检查本地是否有,没有再从云端拉取
337
+ old_df = None
338
+ if local_path.exists():
339
  try:
340
+ old_df = pd.read_parquet(local_path)
341
+ logger.info(f"Using local cache for {filename}")
342
+ except Exception: pass
343
+
344
+ if old_df is None:
345
+ repo_id = os.getenv("DATASET_REPO_ID")
346
+ if repo_id:
347
+ try:
348
+ old_file = hf_hub_download(repo_id=repo_id, filename=f"data/parquet/{filename}", repo_type="dataset")
349
+ old_df = pd.read_parquet(old_file)
350
+ logger.info(f"Downloaded {filename} from cloud")
351
+ except Exception:
352
+ pass
353
+
354
+ # 合并新数据
355
  month_inc = inc_df[(inc_df['trade_date'].dt.year == yr) & (inc_df['trade_date'].dt.month == mo)]
356
+ if old_df is not None:
357
+ final_month_df = pd.concat([old_df, month_inc]).drop_duplicates(subset=['code', 'trade_date'])
358
+ else:
359
+ final_month_df = month_inc
360
+
361
+ final_month_df.to_parquet(local_path)
362
+ logger.info(f"Saved updated data for {filename}")
363
 
364
  return len(all_new_data)
365
 
366
+
367
+ # ==================== 新增:资金流向数据同步 ====================
368
+
369
+ def get_stock_fund_flow(code: str, market: str) -> Optional[pd.DataFrame]:
370
+ """获取单只股票资金流向数据"""
371
+ max_retries = 3
372
+ for attempt in range(max_retries):
373
+ try:
374
+ # 确定 market 参数
375
+ if market == '北交所' or code.startswith(('8', '4', '920')):
376
+ mk = 'bj'
377
+ elif code.startswith(('6', '9')):
378
+ mk = 'sh'
379
+ else:
380
+ mk = 'sz'
381
+
382
+ df = ak.stock_individual_fund_flow(stock=code, market=mk)
383
+ if df is not None and not df.empty:
384
+ # 标准化列名
385
+ rename_map = {
386
+ '日期': 'trade_date', '收盘价': 'close', '涨跌幅': 'pct_chg',
387
+ '主力净流入-净额': 'main_net_inflow',
388
+ '主力净流入-净占比': 'main_net_inflow_pct',
389
+ '超大单净流入-净额': 'huge_net_inflow',
390
+ '超大单净流入-净占比': 'huge_net_inflow_pct',
391
+ '大单净流入-净额': 'large_net_inflow',
392
+ '大单净流入-净占比': 'large_net_inflow_pct',
393
+ '中单净流入-净额': 'medium_net_inflow',
394
+ '中单净流入-净占比': 'medium_net_inflow_pct',
395
+ '小单净流入-净额': 'small_net_inflow',
396
+ '小单净流入-净占比': 'small_net_inflow_pct',
397
+ }
398
+ df = df.rename(columns=rename_map)
399
+ df['trade_date'] = pd.to_datetime(df['trade_date'])
400
+ df['code'] = code
401
+
402
+ cols = ['code', 'trade_date', 'close', 'pct_chg',
403
+ 'main_net_inflow', 'main_net_inflow_pct',
404
+ 'huge_net_inflow', 'huge_net_inflow_pct',
405
+ 'large_net_inflow', 'large_net_inflow_pct',
406
+ 'medium_net_inflow', 'medium_net_inflow_pct',
407
+ 'small_net_inflow', 'small_net_inflow_pct']
408
+ return df[[c for c in cols if c in df.columns]]
409
+ except Exception as e:
410
+ if attempt == max_retries - 1:
411
+ pass # 静默失败,很���股票可能没有资金流向数据
412
+ time.sleep(0.5)
413
+ return None
414
+
415
+
416
+ def sync_fund_flow(targets: List[Dict[str, str]], last_trade_day: str) -> int:
417
+ """同步资金流向数据(极致增量版)"""
418
+ logger.info("Syncing fund flow data...")
419
+ flow_path = Path("/tmp/data/fund_flow.parquet")
420
+ flow_path.parent.mkdir(parents=True, exist_ok=True)
421
 
422
+ old_df = None
423
+ global_latest_date = "2000-01-01"
424
+ existing_codes = set()
425
+
426
+ # 1. 优先读取本地缓存
427
+ if flow_path.exists():
428
+ try:
429
+ old_df = pd.read_parquet(flow_path)
430
+ global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
431
+ existing_codes = set(old_df['code'].unique())
432
+ logger.info(f"Local fund flow cache found, latest date: {global_latest_date}")
433
+ except Exception as e:
434
+ logger.warning(f"Failed to read local fund flow cache: {e}")
435
+
436
+ # 2. 本地无缓存,尝试从云端拉取
437
+ if old_df is None:
438
+ repo_id = os.getenv("DATASET_REPO_ID")
439
+ if repo_id:
440
+ try:
441
+ old_file = hf_hub_download(repo_id=repo_id, filename="data/fund_flow.parquet", repo_type="dataset")
442
+ old_df = pd.read_parquet(old_file)
443
+ global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
444
+ existing_codes = set(old_df['code'].unique())
445
+ old_df.to_parquet(flow_path) # 存入本地缓存
446
+ logger.info(f"Downloaded fund flow from cloud, latest date: {global_latest_date}")
447
+ except Exception:
448
+ logger.info("No existing fund flow data found in cloud.")
449
+
450
+ # 3. 全局水位线拦截 + 新股检测
451
+ stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']]
452
+ new_codes = [t for t in stock_targets if t['code'] not in existing_codes]
453
+
454
+ if global_latest_date >= last_trade_day and not new_codes:
455
+ logger.info(f"Fund flow data is already up to date ({global_latest_date}) and no new stocks. Skip.")
456
+ return 0
457
+
458
+ # 4. 增量获取
459
+ if global_latest_date >= last_trade_day:
460
+ logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.")
461
+ sync_targets = new_codes
462
+ else:
463
+ logger.info(f"Syncing fund flow from {global_latest_date} to {last_trade_day}...")
464
+ sync_targets = stock_targets
465
+
466
+ all_data = []
467
+ success_count = 0
468
+
469
+ with ThreadPoolExecutor(max_workers=get_thread_config_safe()['fund']) as executor:
470
+ futures = {executor.submit(get_stock_fund_flow, t['code'], t['market']): t['code'] for t in sync_targets}
471
+ for i, future in enumerate(as_completed(futures), 1):
472
+ res = future.result()
473
+ if res is not None and not res.empty:
474
+ # 只保留新日期的数据
475
+ code = futures[future]
476
+ # 如果是老股票,只保留大于全局最新日期的数据
477
+ if code in existing_codes:
478
+ res = res[res['trade_date'] > pd.to_datetime(global_latest_date)]
479
+
480
+ if not res.empty:
481
+ all_data.append(res)
482
+ success_count += 1
483
+ if i % 500 == 0:
484
+ logger.info(f"Fund flow progress: {i}/{len(sync_targets)}, success: {success_count}")
485
+
486
+ # 5. 合并保存
487
+ if all_data:
488
+ new_df = pd.concat(all_data, ignore_index=True)
489
+ final_df = pd.concat([old_df, new_df]) if old_df is not None else new_df
490
+ final_df = final_df.drop_duplicates(subset=['code', 'trade_date'])
491
+ final_df.to_parquet(flow_path)
492
+ logger.info(f"Fund flow updated: {len(final_df)} total records")
493
+
494
+ return success_count
495
+
496
+
497
+ # ==================== 新增:估值指标数据同步 ====================
498
+
499
+ def get_stock_valuation(code: str) -> Optional[pd.DataFrame]:
500
+ """获取单只股票估值指标数据"""
501
+ max_retries = 3
502
+ for attempt in range(max_retries):
503
+ try:
504
+ df = ak.stock_a_lg_indicator(symbol=code)
505
+ if df is not None and not df.empty:
506
+ # 标准化列名
507
+ rename_map = {
508
+ '日期': 'trade_date',
509
+ '市盈率': 'pe_ttm',
510
+ '市盈率TTM': 'pe_ttm',
511
+ '静态市盈率': 'pe_static',
512
+ '市净率': 'pb',
513
+ '市销率': 'ps_ttm',
514
+ '股息率': 'dv_ratio',
515
+ '总市值': 'total_mv',
516
+ '流通市值': 'circ_mv',
517
+ }
518
+ df = df.rename(columns=rename_map)
519
+ df['trade_date'] = pd.to_datetime(df['trade_date'])
520
+ df['code'] = code
521
+
522
+ cols = ['code', 'trade_date', 'pe_ttm', 'pe_static', 'pb',
523
+ 'ps_ttm', 'dv_ratio', 'total_mv', 'circ_mv']
524
+ available_cols = [c for c in cols if c in df.columns]
525
+ return df[available_cols]
526
+ except Exception:
527
+ time.sleep(0.5)
528
+ return None
529
+
530
+
531
+ def sync_valuation(targets: List[Dict[str, str]], last_trade_day: str) -> int:
532
+ """同步估值指标数据(极致增量版)"""
533
+ logger.info("Syncing valuation data...")
534
+ val_path = Path("/tmp/data/valuation.parquet")
535
+ val_path.parent.mkdir(parents=True, exist_ok=True)
536
+
537
+ old_df = None
538
+ global_latest_date = "2000-01-01"
539
+ existing_codes = set()
540
+
541
+ # 1. 优先读取本地缓存
542
+ if val_path.exists():
543
+ try:
544
+ old_df = pd.read_parquet(val_path)
545
+ global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
546
+ existing_codes = set(old_df['code'].unique())
547
+ logger.info(f"Local valuation cache found, latest date: {global_latest_date}")
548
+ except Exception as e:
549
+ logger.warning(f"Failed to read local valuation cache: {e}")
550
+
551
+ # 2. 本地无缓存,尝试从云端拉取
552
+ if old_df is None:
553
+ repo_id = os.getenv("DATASET_REPO_ID")
554
+ if repo_id:
555
+ try:
556
+ old_file = hf_hub_download(repo_id=repo_id, filename="data/valuation.parquet", repo_type="dataset")
557
+ old_df = pd.read_parquet(old_file)
558
+ global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
559
+ existing_codes = set(old_df['code'].unique())
560
+ old_df.to_parquet(val_path)
561
+ logger.info(f"Downloaded valuation from cloud, latest date: {global_latest_date}")
562
+ except Exception:
563
+ logger.info("No existing valuation data found in cloud.")
564
+
565
+ # 3. 全局水位线拦截 + 新股检测
566
+ stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']]
567
+ new_codes = [t for t in stock_targets if t['code'] not in existing_codes]
568
+
569
+ if global_latest_date >= last_trade_day and not new_codes:
570
+ logger.info(f"Valuation data is already up to date ({global_latest_date}) and no new stocks. Skip.")
571
+ return 0
572
+
573
+ # 4. 增量获取
574
+ if global_latest_date >= last_trade_day:
575
+ logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.")
576
+ sync_targets = new_codes
577
+ else:
578
+ logger.info(f"Syncing valuation from {global_latest_date} to {last_trade_day}...")
579
+ sync_targets = stock_targets
580
+
581
+ all_data = []
582
+ success_count = 0
583
+
584
+ with ThreadPoolExecutor(max_workers=get_thread_config_safe()['valuation']) as executor:
585
+ futures = {executor.submit(get_stock_valuation, t['code']): t['code'] for t in sync_targets}
586
+ for i, future in enumerate(as_completed(futures), 1):
587
+ res = future.result()
588
+ if res is not None and not res.empty:
589
+ code = futures[future]
590
+ if code in existing_codes:
591
+ res = res[res['trade_date'] > pd.to_datetime(global_latest_date)]
592
+
593
+ if not res.empty:
594
+ all_data.append(res)
595
+ success_count += 1
596
+ if i % 500 == 0:
597
+ logger.info(f"Valuation progress: {i}/{len(sync_targets)}, success: {success_count}")
598
+
599
+ # 5. 合并保存
600
+ if all_data:
601
+ new_df = pd.concat(all_data, ignore_index=True)
602
+ final_df = pd.concat([old_df, new_df]) if old_df is not None else new_df
603
+ final_df = final_df.drop_duplicates(subset=['code', 'trade_date'])
604
+ final_df.to_parquet(val_path)
605
+ logger.info(f"Valuation updated: {len(final_df)} total records")
606
+
607
+ return success_count
608
+
609
+
610
+ # ==================== 新增:融资融券数据同步 ====================
611
+
612
+ def get_stock_margin(code: str) -> Optional[pd.DataFrame]:
613
+ """获取单只股票融资融券数据"""
614
+ max_retries = 3
615
+ for attempt in range(max_retries):
616
+ try:
617
+ # 尝试上交所
618
+ if code.startswith('6'):
619
+ df = ak.stock_margin_detail_sh(symbol=code)
620
+ else:
621
+ df = ak.stock_margin_detail_sz(symbol=code)
622
+
623
+ if df is not None and not df.empty:
624
+ # 标准化列名
625
+ rename_map = {
626
+ '日期': 'trade_date',
627
+ '融资余额': 'rzye',
628
+ '融资买入额': 'rzmre',
629
+ '融资偿还额': 'rzche',
630
+ '融券余额': 'rqye',
631
+ '融券卖出量': 'rqmcl',
632
+ '融资融券余额': 'rzrqye',
633
+ }
634
+ df = df.rename(columns=rename_map)
635
+ if 'trade_date' in df.columns:
636
+ df['trade_date'] = pd.to_datetime(df['trade_date'])
637
+ df['code'] = code
638
+
639
+ cols = ['code', 'trade_date', 'rzye', 'rzmre', 'rzche', 'rqye', 'rqmcl', 'rzrqye']
640
+ available_cols = [c for c in cols if c in df.columns]
641
+ return df[available_cols]
642
+ except Exception as e:
643
+ if attempt == max_retries - 1:
644
+ pass
645
+ time.sleep(0.5)
646
+ return None
647
+
648
+
649
+ def sync_margin(targets: List[Dict[str, str]], last_trade_day: str) -> int:
650
+ """同步融资融券数据(极致增量版)"""
651
+ logger.info("Syncing margin trading data...")
652
+ margin_path = Path("/tmp/data/margin.parquet")
653
+ margin_path.parent.mkdir(parents=True, exist_ok=True)
654
+
655
+ old_df = None
656
+ global_latest_date = "2000-01-01"
657
+ existing_codes = set()
658
+
659
+ # 1. 优先读取本地缓存
660
+ if margin_path.exists():
661
+ try:
662
+ old_df = pd.read_parquet(margin_path)
663
+ global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
664
+ existing_codes = set(old_df['code'].unique())
665
+ logger.info(f"Local margin cache found, latest date: {global_latest_date}")
666
+ except Exception as e:
667
+ logger.warning(f"Failed to read local margin cache: {e}")
668
+
669
+ # 2. 本地无缓存,尝试从云端拉取
670
+ if old_df is None:
671
+ repo_id = os.getenv("DATASET_REPO_ID")
672
+ if repo_id:
673
+ try:
674
+ old_file = hf_hub_download(repo_id=repo_id, filename="data/margin.parquet", repo_type="dataset")
675
+ old_df = pd.read_parquet(old_file)
676
+ global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
677
+ existing_codes = set(old_df['code'].unique())
678
+ old_df.to_parquet(margin_path)
679
+ logger.info(f"Downloaded margin from cloud, latest date: {global_latest_date}")
680
+ except Exception:
681
+ logger.info("No existing margin data found in cloud.")
682
+
683
+ # 3. 全局水位线拦截 + 新股检测
684
+ stock_targets = [t for t in targets if t['market'] in ['主板', '创业板', '科创板']]
685
+ new_codes = [t for t in stock_targets if t['code'] not in existing_codes]
686
+
687
+ if global_latest_date >= last_trade_day and not new_codes:
688
+ logger.info(f"Margin data is already up to date ({global_latest_date}) and no new stocks. Skip.")
689
+ return 0
690
+
691
+ # 4. 增量获取
692
+ if global_latest_date >= last_trade_day:
693
+ logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.")
694
+ sync_targets = new_codes
695
+ else:
696
+ logger.info(f"Syncing margin data from {global_latest_date} to {last_trade_day}...")
697
+ sync_targets = stock_targets
698
+
699
+ all_data = []
700
+ success_count = 0
701
 
702
+ with ThreadPoolExecutor(max_workers=get_thread_config_safe()['margin']) as executor:
703
+ futures = {executor.submit(get_stock_margin, t['code']): t['code'] for t in sync_targets}
704
+ for i, future in enumerate(as_completed(futures), 1):
705
+ res = future.result()
706
+ if res is not None and not res.empty:
707
+ code = futures[future]
708
+ if code in existing_codes:
709
+ res = res[res['trade_date'] > pd.to_datetime(global_latest_date)]
710
+
711
+ if not res.empty:
712
+ all_data.append(res)
713
+ success_count += 1
714
+ if i % 500 == 0:
715
+ logger.info(f"Margin progress: {i}/{len(sync_targets)}, success: {success_count}")
716
+
717
+ # 5. 合并保存
718
+ if all_data:
719
+ new_df = pd.concat(all_data, ignore_index=True)
720
+ final_df = pd.concat([old_df, new_df]) if old_df is not None else new_df
721
+ final_df = final_df.drop_duplicates(subset=['code', 'trade_date'])
722
+ final_df.to_parquet(margin_path)
723
+ logger.info(f"Margin updated: {len(final_df)} total records")
724
+
725
+ return success_count
726
+
727
+
728
+ # ==================== 新增:财务指标数据同步 ====================
729
+
730
+ def get_stock_financial_indicator(code: str) -> Optional[pd.DataFrame]:
731
+ """获取单只股票财务指标数据"""
732
+ max_retries = 3
733
+ for attempt in range(max_retries):
734
+ try:
735
+ df = ak.stock_financial_analysis_indicator(symbol=code)
736
+ if df is not None and not df.empty:
737
+ # 标准化列名
738
+ rename_map = {
739
+ '日期': 'trade_date',
740
+ '净资产收益率': 'roe',
741
+ '总资产净利率': 'roa',
742
+ '销售毛利率': 'gross_margin',
743
+ '销售净利率': 'net_margin',
744
+ '资产负债率': 'debt_ratio',
745
+ '流动比率': 'current_ratio',
746
+ '速动比率': 'quick_ratio',
747
+ '存货周转率': 'inventory_turnover',
748
+ '应收账款周转率': 'receivable_turnover',
749
+ '总资产周转率': 'total_asset_turnover',
750
+ }
751
+ df = df.rename(columns=rename_map)
752
+ if 'trade_date' in df.columns:
753
+ df['trade_date'] = pd.to_datetime(df['trade_date'])
754
+ df['code'] = code
755
+
756
+ cols = ['code', 'trade_date', 'roe', 'roa', 'gross_margin', 'net_margin',
757
+ 'debt_ratio', 'current_ratio', 'quick_ratio',
758
+ 'inventory_turnover', 'receivable_turnover', 'total_asset_turnover']
759
+ available_cols = [c for c in cols if c in df.columns]
760
+ return df[available_cols]
761
+ except Exception:
762
+ time.sleep(0.5)
763
+ return None
764
+
765
+
766
+ def sync_financial_indicator(targets: List[Dict[str, str]]) -> int:
767
+ """同步财务指标数据(极致增量版)"""
768
+ logger.info("Syncing financial indicator data...")
769
+ fi_path = Path("/tmp/data/financial_indicator.parquet")
770
+ fi_path.parent.mkdir(parents=True, exist_ok=True)
771
+
772
+ old_df = None
773
+ global_latest_date = "2000-01-01"
774
+ existing_codes = set()
775
+
776
+ # 1. 优先读取本地缓存
777
+ if fi_path.exists():
778
+ try:
779
+ old_df = pd.read_parquet(fi_path)
780
+ global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
781
+ existing_codes = set(old_df['code'].unique())
782
+ logger.info(f"Local financial cache found, latest date: {global_latest_date}")
783
+ except Exception as e:
784
+ logger.warning(f"Failed to read local financial cache: {e}")
785
+
786
+ # 2. 本地无缓存,尝试从云端拉取
787
+ if old_df is None:
788
+ repo_id = os.getenv("DATASET_REPO_ID")
789
+ if repo_id:
790
+ try:
791
+ old_file = hf_hub_download(repo_id=repo_id, filename="data/financial_indicator.parquet", repo_type="dataset")
792
+ old_df = pd.read_parquet(old_file)
793
+ global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
794
+ existing_codes = set(old_df['code'].unique())
795
+ old_df.to_parquet(fi_path)
796
+ logger.info(f"Downloaded financial from cloud, latest date: {global_latest_date}")
797
+ except Exception:
798
+ logger.info("No existing financial data found in cloud.")
799
+
800
+ # 3. 财务指标特殊拦截 + 新股检测
801
+ stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']]
802
+ new_codes = [t for t in stock_targets if t['code'] not in existing_codes]
803
+
804
+ today = get_beijing_time()
805
+ is_recent = False
806
+ if global_latest_date != "2000-01-01":
807
+ days_diff = (today - pd.to_datetime(global_latest_date)).days
808
+ if days_diff < 90:
809
+ is_recent = True
810
+
811
+ if is_recent and not new_codes:
812
+ logger.info(f"Financial data is recent ({global_latest_date}) and no new stocks. Skip.")
813
+ return 0
814
+
815
+ # 4. 增量获取
816
+ if is_recent:
817
+ logger.info(f"Financial data is recent, but found {len(new_codes)} new stocks. Syncing new stocks only.")
818
+ sync_targets = new_codes
819
+ else:
820
+ logger.info(f"Syncing financial indicators (last update: {global_latest_date})...")
821
+ sync_targets = stock_targets
822
+
823
+ all_data = []
824
+ success_count = 0
825
+
826
+ with ThreadPoolExecutor(max_workers=get_thread_config_safe()['financial']) as executor:
827
+ futures = {executor.submit(get_stock_financial_indicator, t['code']): t['code'] for t in sync_targets}
828
+ for i, future in enumerate(as_completed(futures), 1):
829
+ res = future.result()
830
+ if res is not None and not res.empty:
831
+ code = futures[future]
832
+ if code in existing_codes:
833
+ res = res[res['trade_date'] > pd.to_datetime(global_latest_date)]
834
+
835
+ if not res.empty:
836
+ all_data.append(res)
837
+ success_count += 1
838
+ if i % 500 == 0:
839
+ logger.info(f"Financial indicator progress: {i}/{len(sync_targets)}, success: {success_count}")
840
+
841
+ # 5. 合并保存
842
+ if all_data:
843
+ new_df = pd.concat(all_data, ignore_index=True)
844
+ final_df = pd.concat([old_df, new_df]) if old_df is not None else new_df
845
+ final_df = final_df.drop_duplicates(subset=['code', 'trade_date'])
846
+ final_df.to_parquet(fi_path)
847
+ logger.info(f"Financial updated: {len(final_df)} total records")
848
+
849
+ return success_count
850
+
851
+
852
+ # ==================== 新增:股东户数数据同步 ====================
853
+
854
+ def sync_holder_num() -> int:
855
+ """同步股东户数数据(批量获取)"""
856
+ logger.info("Syncing holder number data...")
857
+
858
+ try:
859
+ # 获取最近报告期的股东户数数据
860
+ df = ak.stock_zh_a_gdhs(symbol="全部")
861
+ if df is not None and not df.empty:
862
+ # 标准化列名
863
+ rename_map = {
864
+ '代码': 'code',
865
+ '股东户数': 'holder_num',
866
+ '户均持股数量': 'avg_share',
867
+ '户均持股金额': 'avg_value',
868
+ '总股本': 'total_share',
869
+ '总市值': 'total_value',
870
+ '日期': 'trade_date',
871
+ }
872
+ df = df.rename(columns=rename_map)
873
+ if 'trade_date' in df.columns:
874
+ df['trade_date'] = pd.to_datetime(df['trade_date'])
875
+
876
+ # 保存到 Parquet
877
+ hn_path = Path("/tmp/data/holder_num.parquet")
878
+ hn_path.parent.mkdir(parents=True, exist_ok=True)
879
+ df.to_parquet(hn_path)
880
+ logger.info(f"Holder number data saved: {len(df)} records")
881
+ return len(df)
882
+ except Exception as e:
883
+ logger.warning(f"Failed to sync holder number data: {e}")
884
+
885
+ return 0
886
+
887
+
888
+ # ==================== 新增:分红数据同步 ====================
889
+
890
+ def get_stock_dividend(code: str) -> Optional[pd.DataFrame]:
891
+ """获取单只股票分红数据"""
892
+ max_retries = 3
893
+ for attempt in range(max_retries):
894
+ try:
895
+ df = ak.stock_history_dividend(symbol=code)
896
+ if df is not None and not df.empty:
897
+ # 标准化列名
898
+ rename_map = {
899
+ '公告日期': 'trade_date',
900
+ '分红方案': 'dividend_type',
901
+ '分红金额': 'dividend_amount',
902
+ '股权登记日': 'record_date',
903
+ '除权除息日': 'ex_date',
904
+ '派息日': 'pay_date',
905
+ }
906
+ df = df.rename(columns=rename_map)
907
+ df['trade_date'] = pd.to_datetime(df['trade_date'])
908
+ df['code'] = code
909
+
910
+ cols = ['code', 'trade_date', 'dividend_type', 'dividend_amount', 'record_date', 'ex_date', 'pay_date']
911
+ available_cols = [c for c in cols if c in df.columns]
912
+ return df[available_cols]
913
+ except Exception:
914
+ time.sleep(0.5)
915
+ return None
916
+
917
+
918
+ def sync_dividend(targets: List[Dict[str, str]]) -> int:
919
+ """同步分红数据(极致增量版)"""
920
+ logger.info("Syncing dividend data...")
921
+ div_path = Path("/tmp/data/dividend.parquet")
922
+ div_path.parent.mkdir(parents=True, exist_ok=True)
923
+
924
+ old_df = None
925
+ global_latest_date = "2000-01-01"
926
+ existing_codes = set()
927
+
928
+ if div_path.exists():
929
+ try:
930
+ old_df = pd.read_parquet(div_path)
931
+ global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
932
+ existing_codes = set(old_df['code'].unique())
933
+ except Exception: pass
934
+
935
+ if old_df is None:
936
+ repo_id = os.getenv("DATASET_REPO_ID")
937
+ if repo_id:
938
+ try:
939
+ old_file = hf_hub_download(repo_id=repo_id, filename="data/dividend.parquet", repo_type="dataset")
940
+ old_df = pd.read_parquet(old_file)
941
+ global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
942
+ existing_codes = set(old_df['code'].unique())
943
+ old_df.to_parquet(div_path)
944
+ except Exception: pass
945
+
946
+ # 90天检查一次 + 新股检测
947
+ stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']]
948
+ new_codes = [t for t in stock_targets if t['code'] not in existing_codes]
949
+
950
+ today = get_beijing_time()
951
+ is_recent = False
952
+ if global_latest_date != "2000-01-01":
953
+ if (today - pd.to_datetime(global_latest_date)).days < 90:
954
+ is_recent = True
955
+
956
+ if is_recent and not new_codes:
957
+ logger.info(f"Dividend data is recent and no new stocks. Skip.")
958
+ return 0
959
+
960
+ # 4. 增量获取
961
+ if is_recent:
962
+ logger.info(f"Dividend data is recent, but found {len(new_codes)} new stocks. Syncing new stocks only.")
963
+ sync_targets = new_codes
964
+ else:
965
+ logger.info(f"Syncing dividend data (last update: {global_latest_date})...")
966
+ sync_targets = stock_targets
967
+
968
+ all_data = []
969
+ success_count = 0
970
+
971
+ with ThreadPoolExecutor(max_workers=get_thread_config_safe()['dividend']) as executor:
972
+ futures = {executor.submit(get_stock_dividend, t['code']): t['code'] for t in sync_targets}
973
+ for i, future in enumerate(as_completed(futures), 1):
974
+ res = future.result()
975
+ if res is not None and not res.empty:
976
+ code = futures[future]
977
+ if code in existing_codes:
978
+ res = res[res['trade_date'] > pd.to_datetime(global_latest_date)]
979
+
980
+ if not res.empty:
981
+ all_data.append(res)
982
+ success_count += 1
983
+ if i % 500 == 0:
984
+ logger.info(f"Dividend progress: {i}/{len(sync_targets)}, success: {success_count}")
985
+
986
+ if all_data:
987
+ new_df = pd.concat(all_data, ignore_index=True)
988
+ final_df = pd.concat([old_df, new_df]) if old_df is not None else new_df
989
+ final_df = final_df.drop_duplicates(subset=['code', 'trade_date', 'dividend_type'])
990
+ final_df.to_parquet(div_path)
991
+ logger.info(f"Dividend updated: {len(final_df)} total records")
992
+
993
+ return success_count
994
+
995
+
996
+ # ==================== 新增:十大股东数据同步 ====================
997
+
998
+ def sync_top_holders() -> int:
999
+ """同步十大股东数据(批量获取)"""
1000
+ logger.info("Syncing top holders data...")
1001
+
1002
+ try:
1003
+ today = get_beijing_time()
1004
+ df = ak.stock_gdfx_holding_analyse_em(date=today.strftime('%Y%m%d'))
1005
+ if df is not None and not df.empty:
1006
+ rename_map = {
1007
+ '股票代码': 'code',
1008
+ '公告日期': 'trade_date',
1009
+ '股东名称': 'holder_name',
1010
+ '持股数量': 'hold_num',
1011
+ '持股比例': 'hold_ratio',
1012
+ '持股变动': 'hold_change',
1013
+ }
1014
+ df = df.rename(columns=rename_map)
1015
+ df['trade_date'] = pd.to_datetime(df['trade_date'])
1016
+
1017
+ path = Path("/tmp/data/top_holders.parquet")
1018
+ path.parent.mkdir(parents=True, exist_ok=True)
1019
+ df.to_parquet(path)
1020
+ logger.info(f"Top holders data saved: {len(df)} records")
1021
+ return len(df)
1022
+ except Exception as e:
1023
+ logger.warning(f"Failed to sync top holders: {e}")
1024
+ return 0
1025
+
1026
+
1027
+ # ==================== 新增:限售解禁数据同步 ====================
1028
+
1029
+ def sync_restricted_unlock() -> int:
1030
+ """同步限售解禁数据(批量获取)"""
1031
+ logger.info("Syncing restricted unlock data...")
1032
+ path = Path("/tmp/data/restricted_unlock.parquet")
1033
+ path.parent.mkdir(parents=True, exist_ok=True)
1034
+
1035
+ try:
1036
+ # 获取全市场限售解禁数据
1037
+ df = ak.stock_restricted_shares(stock="all")
1038
+ if df is not None and not df.empty:
1039
+ rename_map = {
1040
+ '代码': 'code',
1041
+ '名称': 'name',
1042
+ '解禁日期': 'unlock_date',
1043
+ '解禁数量': 'unlock_num',
1044
+ '解禁股本占总股本比例': 'unlock_ratio',
1045
+ }
1046
+ df = df.rename(columns=rename_map)
1047
+ df['unlock_date'] = pd.to_datetime(df['unlock_date'])
1048
+ df['trade_date'] = get_beijing_time() # 记录同步日期
1049
+
1050
+ df.to_parquet(path)
1051
+ logger.info(f"Restricted unlock data saved: {len(df)} records")
1052
+ return len(df)
1053
+ except Exception as e:
1054
+ logger.warning(f"Failed to sync restricted unlock: {e}")
1055
+ return 0
1056
+
1057
+
1058
+ def main() -> int:
1059
+ """
1060
+ 主函数 - 执行完整的数据同步流程
1061
+
1062
+ Returns:
1063
+ int: 退出码,0 表示成功,1 表示失败
1064
+ """
1065
  logger.info("=" * 60)
1066
+ logger.info("Stock Data Sync Started")
1067
  logger.info("=" * 60)
1068
+
1069
+ try:
1070
+ # 初始化线程配置
1071
+ init_thread_config()
1072
+
1073
+ db = get_db()
1074
+ db.init_db()
1075
+
1076
+ # 1. 列表同步
1077
+ target_list = get_stock_list()
1078
+ list_parquet = Path("/tmp/data/stock_list.parquet")
1079
+ list_parquet.parent.mkdir(parents=True, exist_ok=True)
1080
+ target_list.to_parquet(list_parquet)
1081
+
1082
+ # 2. 行情同步
1083
+ last_day = get_last_trading_day()
1084
+ logger.info(f"Last trading day: {last_day}")
1085
+ sync_count = sync_stock_daily(target_list.to_dict('records'), last_day)
1086
+
1087
+ # 3. 指数同步
1088
+ idx_df = get_index_daily('000300')
1089
+ if idx_df is not None:
1090
+ idx_path = Path("/tmp/data/parquet/index_000300.parquet")
1091
+ idx_path.parent.mkdir(parents=True, exist_ok=True)
1092
+ idx_df.to_parquet(idx_path)
1093
+
1094
+ # 4-10. 各类指标同步
1095
+ logger.info("-" * 40)
1096
+ fund_flow_count = sync_fund_flow(target_list.to_dict('records'), last_day)
1097
+
1098
+ logger.info("-" * 40)
1099
+ valuation_count = sync_valuation(target_list.to_dict('records'), last_day)
1100
+
1101
+ logger.info("-" * 40)
1102
+ margin_count = sync_margin(target_list.to_dict('records'), last_day)
1103
+
1104
+ logger.info("-" * 40)
1105
+ financial_count = sync_financial_indicator(target_list.to_dict('records'))
1106
+
1107
+ logger.info("-" * 40)
1108
+ holder_count = sync_holder_num()
1109
+
1110
+ logger.info("-" * 40)
1111
+ dividend_count = sync_dividend(target_list.to_dict('records'))
1112
+
1113
+ logger.info("-" * 40)
1114
+ top_holders_count = sync_top_holders()
1115
+
1116
+ logger.info("-" * 40)
1117
+ restricted_count = sync_restricted_unlock()
1118
+
1119
+ # 11. 上传
1120
+ logger.info("-" * 40)
1121
+ logger.info("Uploading to Hugging Face Dataset...")
1122
+ db.upload_db()
1123
+
1124
+ logger.info("=" * 60)
1125
+ logger.info("Sync Completed Successfully!")
1126
+ summary = (f"Daily={sync_count}, FundFlow={fund_flow_count}, Valuation={valuation_count}, "
1127
+ f"Margin={margin_count}, Financial={financial_count}, Holder={holder_count}, "
1128
+ f"Dividend={dividend_count}, TopHolders={top_holders_count}, Restricted={restricted_count}")
1129
+ logger.info(f"Summary: {summary}")
1130
+ logger.info("=" * 60)
1131
+ return 0
1132
+
1133
+ except Exception as e:
1134
+ logger.error(f"Sync failed with error: {e}")
1135
+ return 1
1136
 
1137
  if __name__ == "__main__":
1138
+ sys.exit(main())