chuan commited on
Commit
97507d2
·
1 Parent(s): c3b0483

feat: add cloud-to-dataset sync and local download script

Browse files
README.md CHANGED
@@ -39,6 +39,28 @@ pinned: false
39
 
40
  - **系统日志 (系统日志/)**: 统一存储各组件运行产生的日志。
41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  ## 开发指南
43
 
44
  1. **环境准备**: 建议使用 Python 3.14+ 环境。
 
39
 
40
  - **系统日志 (系统日志/)**: 统一存储各组件运行产生的日志。
41
 
42
+ ## 云端采集与数据同步 (New)
43
+
44
+ 本系统支持在 Hugging Face Spaces 上进行 7x24 小时自动行情采集,并自动同步到 Hugging Face Dataset。
45
+
46
+ ### 1. 云端配置 (Hugging Face)
47
+ 在 Space 的 **Settings** -> **Variables and secrets** 中添加以下 Secrets:
48
+ - `SUPABASE_URL`: 你的 Supabase 项目 URL
49
+ - `SUPABASE_ANON_KEY`: 你的 Supabase Anon Key
50
+ - `HF_TOKEN`: **必须**。具有 `Write` 权限的 Hugging Face Token,用于将数据上传到 Dataset。
51
+
52
+ ### 2. 数据流向
53
+ 1. **采集**: `服务/数据采集/启动采集.py` 实时采集 Binance 数据并保存为 `.parquet` 碎片。
54
+ 2. **整理**: `app.py` 每 12 小时触发一次 `整理行情数据.py`,将碎片合并为每日文件。
55
+ 3. **同步**: 整理完成后,自动运行 `hf_sync.py` 将数据推送到数据集 `chenchuanshen/Quant_Market_Data`。
56
+
57
+ ### 3. 本地获取数据
58
+ 在本地项目根目录下运行:
59
+ ```bash
60
+ python 下载云端数据.py
61
+ ```
62
+ 该脚本会自动对比云端与本地差异,只下载新增的行情数据。
63
+
64
  ## 开发指南
65
 
66
  1. **环境准备**: 建议使用 Python 3.14+ 环境。
app.py CHANGED
@@ -48,9 +48,33 @@ def 启动后台采集():
48
  for line in process.stdout:
49
  print(f"[Collector] {line.strip()}")
50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51
  # 启动后台线程
52
- thread = threading.Thread(target=启动后台采集, daemon=True)
53
- thread.start()
 
 
 
54
 
55
  # ==========================================
56
  # 2. UI 逻辑
 
48
  for line in process.stdout:
49
  print(f"[Collector] {line.strip()}")
50
 
51
+ def 周期性整理与同步():
52
+ """每隔 12 小时执行一次数据整理与 HF 同步"""
53
+ import sys
54
+ organize_script = os.path.join("服务", "数据采集", "整理行情数据.py")
55
+ env = os.environ.copy()
56
+ env["PYTHONPATH"] = os.getcwd()
57
+
58
+ while True:
59
+ # 等待一段时间再执行第一次(让采集运行一会儿)
60
+ time.sleep(60) # 启动后 1 分钟先跑一次
61
+ print("🕒 开始执行周期性数据整理与同步...")
62
+ try:
63
+ # 运行整理脚本,默认会触发 sync-hf (因为我们在脚本里改了默认值为 True)
64
+ subprocess.run([sys.executable, organize_script], env=env, check=True)
65
+ print("✅ 周期性整理与同步完成。")
66
+ except Exception as e:
67
+ print(f"❌ 周期性整理失败: {e}")
68
+
69
+ # 每 12 小时运行一次
70
+ time.sleep(12 * 3600)
71
+
72
  # 启动后台线程
73
+ thread_collector = threading.Thread(target=启动后台采集, daemon=True)
74
+ thread_collector.start()
75
+
76
+ thread_sync = threading.Thread(target=周期性整理与同步, daemon=True)
77
+ thread_sync.start()
78
 
79
  # ==========================================
80
  # 2. UI 逻辑
requirements.txt CHANGED
@@ -20,6 +20,7 @@ pydantic
20
  beautifulsoup4
21
  lxml
22
  plotly
 
23
  SQLAlchemy
24
  # Other necessary libs from original list
25
  aiodns
 
20
  beautifulsoup4
21
  lxml
22
  plotly
23
+ huggingface_hub
24
  SQLAlchemy
25
  # Other necessary libs from original list
26
  aiodns
下载云端数据.py ADDED
@@ -0,0 +1,48 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Quant Unified 量化交易系统
3
+ 下载云端数据 (HF Dataset -> Local)
4
+ """
5
+ import os
6
+ from pathlib import Path
7
+ from huggingface_hub import snapshot_download
8
+ import logging
9
+
10
+ # 配置日志
11
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
12
+ logger = logging.getLogger(__name__)
13
+
14
+ # 配置区域
15
+ # ---------------------------------------------------------
16
+ # 数据集名称
17
+ DATASET_REPO = "chenchuanshen/Quant_Market_Data"
18
+ # 本地行情数据存放路径
19
+ LOCAL_DATA_DIR = Path(__file__).resolve().parent / "data" / "行情数据_整理"
20
+ # ---------------------------------------------------------
21
+
22
+ def download_data():
23
+ """从 Hugging Face Dataset 下载/同步数据到本地"""
24
+ logger.info(f"🔍 正在检查云端数据集: {DATASET_REPO}...")
25
+
26
+ # 确保本地目录存在
27
+ LOCAL_DATA_DIR.mkdir(parents=True, exist_ok=True)
28
+
29
+ try:
30
+ # 使用 snapshot_download 自动对比并下载增量数据
31
+ # ignore_patterns 可以排除一些不必要的文件
32
+ local_path = snapshot_download(
33
+ repo_id=DATASET_REPO,
34
+ repo_type="dataset",
35
+ local_dir=str(LOCAL_DATA_DIR),
36
+ local_dir_use_symlinks=False, # 直接拷贝文件
37
+ # token=os.getenv("HF_TOKEN") # 如果是私有数据集需要 Token
38
+ )
39
+
40
+ logger.info(f"✨ 同步完成!数据已保存至: {local_path}")
41
+ return True
42
+ except Exception as e:
43
+ logger.error(f"❌ 下载失败: {e}")
44
+ logger.info("💡 提示: 如果是私有数据集,请先运行 `huggingface-cli login` 或设置 HF_TOKEN 环境变量")
45
+ return False
46
+
47
+ if __name__ == "__main__":
48
+ download_data()
服务/数据采集/hf_sync.py ADDED
@@ -0,0 +1,76 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Quant Unified 量化交易系统
3
+ Hugging Face Dataset 同步工具
4
+ """
5
+ import os
6
+ import shutil
7
+ from pathlib import Path
8
+ from huggingface_hub import HfApi, create_repo
9
+ from datetime import datetime
10
+ import logging
11
+
12
+ # 配置日志
13
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
14
+ logger = logging.getLogger(__name__)
15
+
16
+ # 配置区域
17
+ # ---------------------------------------------------------
18
+ # 数据集名称: 用户名/数据集名
19
+ DATASET_REPO = "chenchuanshen/Quant_Market_Data"
20
+ # 本地行情数据路径
21
+ LOCAL_DATA_DIR = Path(__file__).resolve().parents[2] / "data" / "行情数据_整理"
22
+ # ---------------------------------------------------------
23
+
24
+ def sync_to_hf():
25
+ """将本地整理好的数据同步到 Hugging Face Dataset"""
26
+ token = os.getenv("HF_TOKEN")
27
+ if not token:
28
+ logger.error("❌ 未发现 HF_TOKEN 环境变量,请在 Space Settings 中添加 Secret: HF_TOKEN (需要 Write 权限)")
29
+ return False
30
+
31
+ api = HfApi(token=token)
32
+
33
+ # 1. 确保仓库存在
34
+ try:
35
+ create_repo(repo_id=DATASET_REPO, repo_type="dataset", exist_ok=True)
36
+ logger.info(f"✅ 数据集仓库已就绪: {DATASET_REPO}")
37
+ except Exception as e:
38
+ logger.error(f"❌ 创建/访问仓库失败: {e}")
39
+ return False
40
+
41
+ # 2. 扫描本地整理好的数据文件
42
+ if not LOCAL_DATA_DIR.exists():
43
+ logger.warning(f"⚠️ 本地整理目录不存在: {LOCAL_DATA_DIR}")
44
+ return False
45
+
46
+ files_to_upload = list(LOCAL_DATA_DIR.rglob("*.parquet"))
47
+ if not files_to_upload:
48
+ logger.info("ℹ️ 没有发现需要上传的 .parquet 文件")
49
+ return True
50
+
51
+ logger.info(f"🚀 准备同步 {len(files_to_upload)} 个文件到云端...")
52
+
53
+ # 3. 批量上传
54
+ try:
55
+ # 我们按日期分目录上传,保持目录结构
56
+ # 这里的 path_in_repo 会保持 LOCAL_DATA_DIR 之后的相对路径
57
+ for file_path in files_to_upload:
58
+ relative_path = file_path.relative_to(LOCAL_DATA_DIR)
59
+ path_in_repo = str(relative_path)
60
+
61
+ logger.info(f"正在上传: {path_in_repo}")
62
+ api.upload_file(
63
+ path_or_fileobj=str(file_path),
64
+ path_in_repo=path_in_repo,
65
+ repo_id=DATASET_REPO,
66
+ repo_type="dataset",
67
+ )
68
+
69
+ logger.info("✨ 所有文件同步完成!")
70
+ return True
71
+ except Exception as e:
72
+ logger.error(f"❌ 同步过程中出错: {e}")
73
+ return False
74
+
75
+ if __name__ == "__main__":
76
+ sync_to_hf()
服务/数据采集/整理行情数据.py CHANGED
@@ -42,6 +42,7 @@ import pandas as pd
42
  默认_DELETE_SOURCE = False # (已弃用,建议用 MOVE_TO_BACKUP) 整理完后是否删除原始碎片文件?
43
  默认_DELETE_TODAY = False # 是否移动/删除今天的碎片文件?(今天的还在采集,建议不移动)
44
  默认_CHECK_GAP = True # 是否检查并生成空缺报告?
 
45
  默认_GAP_MS_DEPTH = 2000 # 深度数据超过 2 秒没数据就算小缺口
46
  默认_GAP_MS_TRADE = 10000 # 成交数据超过 10 秒没数据就算小缺口
47
  默认_GAP_SAMPLES = 50 # 每个文件最多记录多少个缺口样本
@@ -454,6 +455,7 @@ def main(argv: list[str]) -> int:
454
  parser.add_argument("--delete-source", action="store_true", default=bool(默认_DELETE_SOURCE))
455
  parser.add_argument("--delete-today", action="store_true", default=bool(默认_DELETE_TODAY))
456
  parser.add_argument("--check-gap", action="store_true", default=bool(默认_CHECK_GAP))
 
457
  parser.add_argument("--gap-ms-depth", type=int, default=int(默认_GAP_MS_DEPTH))
458
  parser.add_argument("--gap-ms-trade", type=int, default=int(默认_GAP_MS_TRADE))
459
  parser.add_argument("--gap-samples", type=int, default=int(默认_GAP_SAMPLES))
@@ -646,6 +648,16 @@ def main(argv: list[str]) -> int:
646
  md_path.write_text("\n".join(md_lines), encoding="utf-8")
647
  print(f"可读报告已生成: {md_path}")
648
 
 
 
 
 
 
 
 
 
 
 
649
  return 0
650
 
651
 
 
42
  默认_DELETE_SOURCE = False # (已弃用,建议用 MOVE_TO_BACKUP) 整理完后是否删除原始碎片文件?
43
  默认_DELETE_TODAY = False # 是否移动/删除今天的碎片文件?(今天的还在采集,建议不移动)
44
  默认_CHECK_GAP = True # 是否检查并生成空缺报告?
45
+ 默认_SYNC_HF = True # 整理完成后是否自动同步到 Hugging Face Dataset
46
  默认_GAP_MS_DEPTH = 2000 # 深度数据超过 2 秒没数据就算小缺口
47
  默认_GAP_MS_TRADE = 10000 # 成交数据超过 10 秒没数据就算小缺口
48
  默认_GAP_SAMPLES = 50 # 每个文件最多记录多少个缺口样本
 
455
  parser.add_argument("--delete-source", action="store_true", default=bool(默认_DELETE_SOURCE))
456
  parser.add_argument("--delete-today", action="store_true", default=bool(默认_DELETE_TODAY))
457
  parser.add_argument("--check-gap", action="store_true", default=bool(默认_CHECK_GAP))
458
+ parser.add_argument("--sync-hf", action="store_true", default=bool(默认_SYNC_HF))
459
  parser.add_argument("--gap-ms-depth", type=int, default=int(默认_GAP_MS_DEPTH))
460
  parser.add_argument("--gap-ms-trade", type=int, default=int(默认_GAP_MS_TRADE))
461
  parser.add_argument("--gap-samples", type=int, default=int(默认_GAP_SAMPLES))
 
648
  md_path.write_text("\n".join(md_lines), encoding="utf-8")
649
  print(f"可读报告已生成: {md_path}")
650
 
651
+ if args.sync_hf:
652
+ try:
653
+ from hf_sync import sync_to_hf
654
+ print("\n🚀 正在触发云端同步...")
655
+ sync_to_hf()
656
+ except ImportError:
657
+ print("\n⚠️ 无法加载 hf_sync.py,跳过同步。")
658
+ except Exception as e:
659
+ print(f"\n❌ 同步过程中出错: {e}")
660
+
661
  return 0
662
 
663