superxu520 commited on
Commit
892cb58
·
1 Parent(s): 2aa8ac4

"feat:upload-after-each-indicator"

Browse files
Files changed (2) hide show
  1. app/database.py +48 -1
  2. sync_data.py +62 -7
app/database.py CHANGED
@@ -200,7 +200,7 @@ class DatabaseManager:
200
 
201
  # 5. 上传融资融券数据(按月分表)
202
  margin_dir = Path(os.path.dirname(DUCKDB_PATH)) / "margin"
203
- if margindir.exists():
204
  for mar_file in margin_dir.glob("*.parquet"):
205
  upload_file(
206
  path_or_fileobj=str(mar_file),
@@ -271,6 +271,53 @@ class DatabaseManager:
271
  finally:
272
  _ = self.conn
273
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
274
  def _create_tables(self) -> None:
275
  """创建数据库表结构"""
276
  conn = self.conn
 
200
 
201
  # 5. 上传融资融券数据(按月分表)
202
  margin_dir = Path(os.path.dirname(DUCKDB_PATH)) / "margin"
203
+ if margin_dir.exists():
204
  for mar_file in margin_dir.glob("*.parquet"):
205
  upload_file(
206
  path_or_fileobj=str(mar_file),
 
271
  finally:
272
  _ = self.conn
273
 
274
+ def upload_indicator(self, indicator_name: str, local_path: Path, remote_path: str) -> bool:
275
+ """
276
+ 上传单个指标数据到 HF Dataset
277
+
278
+ Args:
279
+ indicator_name: 指标名称(用于日志)
280
+ local_path: 本地文件或目录路径
281
+ remote_path: 远程路径前缀(如 "data/fund_flow")
282
+
283
+ Returns:
284
+ bool: 是否上传成功
285
+ """
286
+ if not HF_TOKEN or not DATASET_REPO_ID:
287
+ logger.warning("HF_TOKEN or DATASET_REPO_ID not set, skipping upload")
288
+ return False
289
+
290
+ try:
291
+ login(token=HF_TOKEN)
292
+
293
+ if local_path.is_file():
294
+ # 单文件上传
295
+ upload_file(
296
+ path_or_fileobj=str(local_path),
297
+ path_in_repo=f"{remote_path}/{local_path.name}",
298
+ repo_id=DATASET_REPO_ID,
299
+ repo_type="dataset",
300
+ )
301
+ logger.info(f"{indicator_name} uploaded: {local_path.name}")
302
+ elif local_path.is_dir():
303
+ # 目录上传(上传所有 parquet 文件)
304
+ uploaded_count = 0
305
+ for p_file in local_path.glob("*.parquet"):
306
+ upload_file(
307
+ path_or_fileobj=str(p_file),
308
+ path_in_repo=f"{remote_path}/{p_file.name}",
309
+ repo_id=DATASET_REPO_ID,
310
+ repo_type="dataset",
311
+ )
312
+ uploaded_count += 1
313
+ if uploaded_count > 0:
314
+ logger.info(f"{indicator_name} uploaded: {uploaded_count} files")
315
+
316
+ return True
317
+ except Exception as e:
318
+ logger.error(f"Failed to upload {indicator_name}: {e}")
319
+ return False
320
+
321
  def _create_tables(self) -> None:
322
  """创建数据库表结构"""
323
  conn = self.conn
sync_data.py CHANGED
@@ -1318,7 +1318,7 @@ def sync_restricted_unlock() -> int:
1318
 
1319
  def main() -> int:
1320
  """
1321
- 主函数 - 执行完整的数据同步流程
1322
 
1323
  Returns:
1324
  int: 退出码,0 表示成功,1 表示失败
@@ -1334,16 +1334,33 @@ def main() -> int:
1334
  db = get_db()
1335
  db.init_db()
1336
 
 
 
 
 
 
 
 
 
 
 
 
1337
  # 1. 列表同步
1338
  target_list = get_stock_list()
1339
  list_parquet = Path("/tmp/data/stock_list.parquet")
1340
  list_parquet.parent.mkdir(parents=True, exist_ok=True)
1341
  target_list.to_parquet(list_parquet)
 
1342
 
1343
  # 2. 行情同步
1344
  last_day = get_last_trading_day()
1345
  logger.info(f"Last trading day: {last_day}")
1346
  sync_count = sync_stock_daily(target_list.to_dict('records'), last_day)
 
 
 
 
 
1347
 
1348
  # 3. 指数同步
1349
  idx_df = get_index_daily('000300')
@@ -1351,36 +1368,74 @@ def main() -> int:
1351
  idx_path = Path("/tmp/data/parquet/index_000300.parquet")
1352
  idx_path.parent.mkdir(parents=True, exist_ok=True)
1353
  idx_df.to_parquet(idx_path)
 
1354
 
1355
- # 4-10. 各类指标同步
1356
  logger.info("-" * 40)
1357
  fund_flow_count = sync_fund_flow(target_list.to_dict('records'), last_day)
 
 
 
 
 
1358
 
 
1359
  logger.info("-" * 40)
1360
  valuation_count = sync_valuation(target_list.to_dict('records'), last_day)
 
 
 
 
 
1361
 
 
1362
  logger.info("-" * 40)
1363
  margin_count = sync_margin(target_list.to_dict('records'), last_day)
 
 
 
 
 
1364
 
 
1365
  logger.info("-" * 40)
1366
  financial_count = sync_financial_indicator(target_list.to_dict('records'))
 
 
 
 
1367
 
 
1368
  logger.info("-" * 40)
1369
  holder_count = sync_holder_num()
 
 
 
 
1370
 
 
1371
  logger.info("-" * 40)
1372
  dividend_count = sync_dividend(target_list.to_dict('records'))
 
 
 
 
1373
 
 
1374
  logger.info("-" * 40)
1375
  top_holders_count = sync_top_holders()
 
 
 
 
1376
 
 
1377
  logger.info("-" * 40)
1378
  restricted_count = sync_restricted_unlock()
1379
-
1380
- # 11. 上传
1381
- logger.info("-" * 40)
1382
- logger.info("Uploading to Hugging Face Dataset...")
1383
- db.upload_db()
1384
 
1385
  logger.info("=" * 60)
1386
  logger.info("Sync Completed Successfully!")
 
1318
 
1319
  def main() -> int:
1320
  """
1321
+ 主函数 - 执行完整的数据同步流程(每类指标完成后即时上传)
1322
 
1323
  Returns:
1324
  int: 退出码,0 表示成功,1 表示失败
 
1334
  db = get_db()
1335
  db.init_db()
1336
 
1337
+ # 统计变量
1338
+ sync_count = 0
1339
+ fund_flow_count = 0
1340
+ valuation_count = 0
1341
+ margin_count = 0
1342
+ financial_count = 0
1343
+ holder_count = 0
1344
+ dividend_count = 0
1345
+ top_holders_count = 0
1346
+ restricted_count = 0
1347
+
1348
  # 1. 列表同步
1349
  target_list = get_stock_list()
1350
  list_parquet = Path("/tmp/data/stock_list.parquet")
1351
  list_parquet.parent.mkdir(parents=True, exist_ok=True)
1352
  target_list.to_parquet(list_parquet)
1353
+ db.upload_indicator("Stock List", list_parquet, "data")
1354
 
1355
  # 2. 行情同步
1356
  last_day = get_last_trading_day()
1357
  logger.info(f"Last trading day: {last_day}")
1358
  sync_count = sync_stock_daily(target_list.to_dict('records'), last_day)
1359
+ # 上传日K行情数据
1360
+ parquet_dir = Path("/tmp/data/parquet")
1361
+ if parquet_dir.exists():
1362
+ for p_file in parquet_dir.glob("*.parquet"):
1363
+ db.upload_indicator("Daily Data", p_file, "data/parquet")
1364
 
1365
  # 3. 指数同步
1366
  idx_df = get_index_daily('000300')
 
1368
  idx_path = Path("/tmp/data/parquet/index_000300.parquet")
1369
  idx_path.parent.mkdir(parents=True, exist_ok=True)
1370
  idx_df.to_parquet(idx_path)
1371
+ db.upload_indicator("Index Data", idx_path, "data/parquet")
1372
 
1373
+ # 4. 资金流向同步
1374
  logger.info("-" * 40)
1375
  fund_flow_count = sync_fund_flow(target_list.to_dict('records'), last_day)
1376
+ # 即时上传
1377
+ fund_flow_dir = Path("/tmp/data/fund_flow")
1378
+ if fund_flow_dir.exists() and any(fund_flow_dir.glob("*.parquet")):
1379
+ for ff_file in fund_flow_dir.glob("*.parquet"):
1380
+ db.upload_indicator("Fund Flow", ff_file, "data/fund_flow")
1381
 
1382
+ # 5. 估值指标同步
1383
  logger.info("-" * 40)
1384
  valuation_count = sync_valuation(target_list.to_dict('records'), last_day)
1385
+ # 即时上传
1386
+ valuation_dir = Path("/tmp/data/valuation")
1387
+ if valuation_dir.exists() and any(valuation_dir.glob("*.parquet")):
1388
+ for val_file in valuation_dir.glob("*.parquet"):
1389
+ db.upload_indicator("Valuation", val_file, "data/valuation")
1390
 
1391
+ # 6. 融资融券同步
1392
  logger.info("-" * 40)
1393
  margin_count = sync_margin(target_list.to_dict('records'), last_day)
1394
+ # 即时上传
1395
+ margin_dir = Path("/tmp/data/margin")
1396
+ if margin_dir.exists() and any(margin_dir.glob("*.parquet")):
1397
+ for mar_file in margin_dir.glob("*.parquet"):
1398
+ db.upload_indicator("Margin", mar_file, "data/margin")
1399
 
1400
+ # 7. 财务指标同步
1401
  logger.info("-" * 40)
1402
  financial_count = sync_financial_indicator(target_list.to_dict('records'))
1403
+ # 即时上传
1404
+ fi_path = Path("/tmp/data/financial_indicator.parquet")
1405
+ if fi_path.exists():
1406
+ db.upload_indicator("Financial Indicator", fi_path, "data")
1407
 
1408
+ # 8. 股东户数同步
1409
  logger.info("-" * 40)
1410
  holder_count = sync_holder_num()
1411
+ # 即时上传
1412
+ holder_path = Path("/tmp/data/holder_num.parquet")
1413
+ if holder_path.exists():
1414
+ db.upload_indicator("Holder Num", holder_path, "data")
1415
 
1416
+ # 9. 分红数据同步
1417
  logger.info("-" * 40)
1418
  dividend_count = sync_dividend(target_list.to_dict('records'))
1419
+ # 即时上传
1420
+ div_path = Path("/tmp/data/dividend.parquet")
1421
+ if div_path.exists():
1422
+ db.upload_indicator("Dividend", div_path, "data")
1423
 
1424
+ # 10. 十大股东同步
1425
  logger.info("-" * 40)
1426
  top_holders_count = sync_top_holders()
1427
+ # 即时上传
1428
+ top_holders_path = Path("/tmp/data/top_holders.parquet")
1429
+ if top_holders_path.exists():
1430
+ db.upload_indicator("Top Holders", top_holders_path, "data")
1431
 
1432
+ # 11. 限售解禁同步
1433
  logger.info("-" * 40)
1434
  restricted_count = sync_restricted_unlock()
1435
+ # 即时上传
1436
+ restricted_path = Path("/tmp/data/restricted_unlock.parquet")
1437
+ if restricted_path.exists():
1438
+ db.upload_indicator("Restricted Unlock", restricted_path, "data")
 
1439
 
1440
  logger.info("=" * 60)
1441
  logger.info("Sync Completed Successfully!")