chuan
fix: remove incompatible height and wrap arguments from gr.DataFrame
40c45cb
# -*- coding: utf-8 -*-
"""
Quant_Unified 监控面板 (Gradio 版)
这是一个运行在 Hugging Face Spaces 上的监控应用,它会:
1. 在后台启动数据采集服务。
2. 实时从 Supabase 获取并显示各个服务的运行状态。
"""
import gradio as gr
import os
import subprocess
import time
import pandas as pd
from supabase import create_client
from datetime import datetime
import threading
# ==========================================
# 1. 配置与初始化
# ==========================================
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") or os.getenv("SUPABASE_ANON_KEY")
# 初始化 Supabase 客户端
supabase = None
if SUPABASE_URL and SUPABASE_KEY:
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
def 启动后台采集():
"""在独立进程中启动采集脚本"""
print("🚀 正在启动后台采集服务...")
script_path = os.path.join("服务", "数据采集", "启动采集.py")
# 设置环境变量,确保子进程能找到项目根目录
env = os.environ.copy()
env["PYTHONPATH"] = os.getcwd()
# 使用 sys.executable 确保使用相同的 Python 解释器
import sys
process = subprocess.Popen(
[sys.executable, script_path],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True
)
# 实时打印日志到终端(HF 容器日志可见)
for line in process.stdout:
print(f"[Collector] {line.strip()}")
def 周期性整理与同步():
"""每隔 4 小时执行一次数据整理与 HF 同步"""
import sys
organize_script = os.path.join("服务", "数据采集", "整理行情数据.py")
env = os.environ.copy()
env["PYTHONPATH"] = os.getcwd()
while True:
# 等待一段时间再执行第一次(让采集运行一会儿)
time.sleep(60) # 启动后 1 分钟先跑一次
print("🕒 开始执行周期性数据整理与同步...")
try:
# 1. 执行整理 (整理后会自动删除原始碎片)
subprocess.run([sys.executable, organize_script], env=env, check=True)
# 2. 执行云端同步
print("📤 开始同步到 Hugging Face Dataset...")
from 服务.数据采集.hf_sync import sync_to_hf
if sync_to_hf():
print("✅ 周期性整理与同步完成。")
except Exception as e:
print(f"❌ 周期性整理与同步失败: {e}")
# 每 4 小时运行一次
time.sleep(4 * 3600)
# 启动后台线程
thread_collector = threading.Thread(target=启动后台采集, daemon=True)
thread_collector.start()
thread_sync = threading.Thread(target=周期性整理与同步, daemon=True)
thread_sync.start()
# ==========================================
# 2. UI 逻辑
# ==========================================
def 获取监控数据():
"""从 Supabase 获取 service_status 表的所有数据"""
if not supabase:
return pd.DataFrame([{"错误": "未配置 SUPABASE_URL 或 KEY"}])
try:
response = supabase.table("service_status").select("*").execute()
data = response.data
if not data:
return pd.DataFrame([{"信息": "目前没有服务在运行"}])
# 转换为 DataFrame 方便展示
df = pd.DataFrame(data)
# --- 新增:从嵌套的 details 字典中提取性能指标 ---
def extract_metrics(row):
details = row.get("details", {})
if isinstance(details, dict):
row["cpu_percent"] = details.get("cpu_percent", "")
row["memory_percent"] = details.get("memory_percent", "")
# 将 details 转换为更易读的字符串,或者只保留除指标外的其他信息
other_details = {k: v for k, v in details.items() if k not in ["cpu_percent", "memory_percent"]}
row["details_str"] = str(other_details)
else:
row["details_str"] = str(details)
return row
df = df.apply(extract_metrics, axis=1)
# 简单处理下时间格式
if "updated_at" in df.columns:
df["更新时间"] = pd.to_datetime(df["updated_at"]).dt.strftime('%Y-%m-%d %H:%M:%S')
df = df.drop(columns=["updated_at"])
# 重命名列名,让高中生也能看懂
rename_map = {
"service_name": "服务名称",
"status": "状态",
"cpu_percent": "CPU使用率(%)",
"memory_percent": "内存使用率(%)",
"details_str": "详细信息"
}
# 只保留需要的列
cols = ["service_name", "status", "details_str", "cpu_percent", "memory_percent", "更新时间"]
available_cols = [c for c in cols if c in df.columns]
df = df[available_cols].rename(columns=rename_map)
return df
except Exception as e:
return pd.DataFrame([{"错误": f"获取数据失败: {str(e)}"}])
def 检查配置状态():
status = {
"SUPABASE_URL": "✅ 已配置" if os.getenv("SUPABASE_URL") else "❌ 未配置",
"SUPABASE_KEY": "✅ 已配置" if (os.getenv("SUPABASE_SERVICE_ROLE_KEY") or os.getenv("SUPABASE_ANON_KEY")) else "❌ 未配置",
"HF_TOKEN": "✅ 已配置" if os.getenv("HF_TOKEN") else "❌ 未配置 (需手动在 Settings -> Secrets 配置)",
}
return status
# ==========================================
# 3. 构建 Gradio 界面
# ==========================================
with gr.Blocks(title="Quant_Unified 监控中心", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🚀 Quant_Unified 量化系统监控中心")
with gr.Accordion("🛠️ 系统环境检查", open=False):
conf = 检查配置状态()
for k, v in conf.items():
gr.Markdown(f"**{k}**: {v}")
gr.Markdown("实时展示部署在云端的采集服务状态。数据通过 Supabase 同步。")
gr.Markdown("---") # 添加一个小分隔线触发重启
with gr.Row():
status_table = gr.DataFrame(
label="服务状态列表",
value=获取监控数据,
every=5,
interactive=False
)
with gr.Row():
refresh_btn = gr.Button("手动刷新")
refresh_btn.click(获取监控数据, outputs=status_table)
gr.Markdown("---")
gr.Markdown("💡 **提示**:如果状态显示为 'ok',说明后台采集正在正常工作。")
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)