Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """ | |
| Quant_Unified 监控面板 (Gradio 版) | |
| 这是一个运行在 Hugging Face Spaces 上的监控应用,它会: | |
| 1. 在后台启动数据采集服务。 | |
| 2. 实时从 Supabase 获取并显示各个服务的运行状态。 | |
| """ | |
| import gradio as gr | |
| import os | |
| import subprocess | |
| import time | |
| import pandas as pd | |
| from supabase import create_client | |
| from datetime import datetime | |
| import threading | |
| # ========================================== | |
| # 1. 配置与初始化 | |
| # ========================================== | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") or os.getenv("SUPABASE_ANON_KEY") | |
| # 初始化 Supabase 客户端 | |
| supabase = None | |
| if SUPABASE_URL and SUPABASE_KEY: | |
| supabase = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| def 启动后台采集(): | |
| """在独立进程中启动采集脚本""" | |
| print("🚀 正在启动后台采集服务...") | |
| script_path = os.path.join("服务", "数据采集", "启动采集.py") | |
| # 设置环境变量,确保子进程能找到项目根目录 | |
| env = os.environ.copy() | |
| env["PYTHONPATH"] = os.getcwd() | |
| # 使用 sys.executable 确保使用相同的 Python 解释器 | |
| import sys | |
| process = subprocess.Popen( | |
| [sys.executable, script_path], | |
| env=env, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1, | |
| universal_newlines=True | |
| ) | |
| # 实时打印日志到终端(HF 容器日志可见) | |
| for line in process.stdout: | |
| print(f"[Collector] {line.strip()}") | |
| def 周期性整理与同步(): | |
| """每隔 4 小时执行一次数据整理与 HF 同步""" | |
| import sys | |
| organize_script = os.path.join("服务", "数据采集", "整理行情数据.py") | |
| env = os.environ.copy() | |
| env["PYTHONPATH"] = os.getcwd() | |
| while True: | |
| # 等待一段时间再执行第一次(让采集运行一会儿) | |
| time.sleep(60) # 启动后 1 分钟先跑一次 | |
| print("🕒 开始执行周期性数据整理与同步...") | |
| try: | |
| # 1. 执行整理 (整理后会自动删除原始碎片) | |
| subprocess.run([sys.executable, organize_script], env=env, check=True) | |
| # 2. 执行云端同步 | |
| print("📤 开始同步到 Hugging Face Dataset...") | |
| from 服务.数据采集.hf_sync import sync_to_hf | |
| if sync_to_hf(): | |
| print("✅ 周期性整理与同步完成。") | |
| except Exception as e: | |
| print(f"❌ 周期性整理与同步失败: {e}") | |
| # 每 4 小时运行一次 | |
| time.sleep(4 * 3600) | |
| # 启动后台线程 | |
| thread_collector = threading.Thread(target=启动后台采集, daemon=True) | |
| thread_collector.start() | |
| thread_sync = threading.Thread(target=周期性整理与同步, daemon=True) | |
| thread_sync.start() | |
| # ========================================== | |
| # 2. UI 逻辑 | |
| # ========================================== | |
| def 获取监控数据(): | |
| """从 Supabase 获取 service_status 表的所有数据""" | |
| if not supabase: | |
| return pd.DataFrame([{"错误": "未配置 SUPABASE_URL 或 KEY"}]) | |
| try: | |
| response = supabase.table("service_status").select("*").execute() | |
| data = response.data | |
| if not data: | |
| return pd.DataFrame([{"信息": "目前没有服务在运行"}]) | |
| # 转换为 DataFrame 方便展示 | |
| df = pd.DataFrame(data) | |
| # --- 新增:从嵌套的 details 字典中提取性能指标 --- | |
| def extract_metrics(row): | |
| details = row.get("details", {}) | |
| if isinstance(details, dict): | |
| row["cpu_percent"] = details.get("cpu_percent", "") | |
| row["memory_percent"] = details.get("memory_percent", "") | |
| # 将 details 转换为更易读的字符串,或者只保留除指标外的其他信息 | |
| other_details = {k: v for k, v in details.items() if k not in ["cpu_percent", "memory_percent"]} | |
| row["details_str"] = str(other_details) | |
| else: | |
| row["details_str"] = str(details) | |
| return row | |
| df = df.apply(extract_metrics, axis=1) | |
| # 简单处理下时间格式 | |
| if "updated_at" in df.columns: | |
| df["更新时间"] = pd.to_datetime(df["updated_at"]).dt.strftime('%Y-%m-%d %H:%M:%S') | |
| df = df.drop(columns=["updated_at"]) | |
| # 重命名列名,让高中生也能看懂 | |
| rename_map = { | |
| "service_name": "服务名称", | |
| "status": "状态", | |
| "cpu_percent": "CPU使用率(%)", | |
| "memory_percent": "内存使用率(%)", | |
| "details_str": "详细信息" | |
| } | |
| # 只保留需要的列 | |
| cols = ["service_name", "status", "details_str", "cpu_percent", "memory_percent", "更新时间"] | |
| available_cols = [c for c in cols if c in df.columns] | |
| df = df[available_cols].rename(columns=rename_map) | |
| return df | |
| except Exception as e: | |
| return pd.DataFrame([{"错误": f"获取数据失败: {str(e)}"}]) | |
| def 检查配置状态(): | |
| status = { | |
| "SUPABASE_URL": "✅ 已配置" if os.getenv("SUPABASE_URL") else "❌ 未配置", | |
| "SUPABASE_KEY": "✅ 已配置" if (os.getenv("SUPABASE_SERVICE_ROLE_KEY") or os.getenv("SUPABASE_ANON_KEY")) else "❌ 未配置", | |
| "HF_TOKEN": "✅ 已配置" if os.getenv("HF_TOKEN") else "❌ 未配置 (需手动在 Settings -> Secrets 配置)", | |
| } | |
| return status | |
| # ========================================== | |
| # 3. 构建 Gradio 界面 | |
| # ========================================== | |
| with gr.Blocks(title="Quant_Unified 监控中心", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🚀 Quant_Unified 量化系统监控中心") | |
| with gr.Accordion("🛠️ 系统环境检查", open=False): | |
| conf = 检查配置状态() | |
| for k, v in conf.items(): | |
| gr.Markdown(f"**{k}**: {v}") | |
| gr.Markdown("实时展示部署在云端的采集服务状态。数据通过 Supabase 同步。") | |
| gr.Markdown("---") # 添加一个小分隔线触发重启 | |
| with gr.Row(): | |
| status_table = gr.DataFrame( | |
| label="服务状态列表", | |
| value=获取监控数据, | |
| every=5, | |
| interactive=False | |
| ) | |
| with gr.Row(): | |
| refresh_btn = gr.Button("手动刷新") | |
| refresh_btn.click(获取监控数据, outputs=status_table) | |
| gr.Markdown("---") | |
| gr.Markdown("💡 **提示**:如果状态显示为 'ok',说明后台采集正在正常工作。") | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |