File size: 6,773 Bytes
8e6a923
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97507d2
e09b7ad
97507d2
 
 
 
 
 
 
 
 
 
e09b7ad
97507d2
e09b7ad
 
 
 
 
 
97507d2
e09b7ad
97507d2
e09b7ad
 
97507d2
8e6a923
97507d2
 
 
 
 
8e6a923
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42637c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8e6a923
 
 
 
 
 
 
 
 
 
 
42637c4
8e6a923
42637c4
 
 
 
8e6a923
 
 
 
16ceaf4
 
 
 
 
 
 
 
8e6a923
 
 
 
 
16ceaf4
 
 
 
 
 
8e6a923
552576a
8e6a923
 
36c4a91
 
 
 
40c45cb
36c4a91
8e6a923
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# -*- coding: utf-8 -*-
"""
Quant_Unified 监控面板 (Gradio 版)
这是一个运行在 Hugging Face Spaces 上的监控应用,它会:
1. 在后台启动数据采集服务。
2. 实时从 Supabase 获取并显示各个服务的运行状态。
"""

import gradio as gr
import os
import subprocess
import time
import pandas as pd
from supabase import create_client
from datetime import datetime
import threading

# ==========================================
# 1. 配置与初始化
# ==========================================
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") or os.getenv("SUPABASE_ANON_KEY")

# 初始化 Supabase 客户端
supabase = None
if SUPABASE_URL and SUPABASE_KEY:
    supabase = create_client(SUPABASE_URL, SUPABASE_KEY)

def 启动后台采集():
    """在独立进程中启动采集脚本"""
    print("🚀 正在启动后台采集服务...")
    script_path = os.path.join("服务", "数据采集", "启动采集.py")
    # 设置环境变量,确保子进程能找到项目根目录
    env = os.environ.copy()
    env["PYTHONPATH"] = os.getcwd()
    # 使用 sys.executable 确保使用相同的 Python 解释器
    import sys
    process = subprocess.Popen(
        [sys.executable, script_path],
        env=env,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1,
        universal_newlines=True
    )
    # 实时打印日志到终端(HF 容器日志可见)
    for line in process.stdout:
        print(f"[Collector] {line.strip()}")

def 周期性整理与同步():
    """每隔 4 小时执行一次数据整理与 HF 同步"""
    import sys
    organize_script = os.path.join("服务", "数据采集", "整理行情数据.py")
    env = os.environ.copy()
    env["PYTHONPATH"] = os.getcwd()
    
    while True:
        # 等待一段时间再执行第一次(让采集运行一会儿)
        time.sleep(60) # 启动后 1 分钟先跑一次
        print("🕒 开始执行周期性数据整理与同步...")
        try:
            # 1. 执行整理 (整理后会自动删除原始碎片)
            subprocess.run([sys.executable, organize_script], env=env, check=True)
            
            # 2. 执行云端同步
            print("📤 开始同步到 Hugging Face Dataset...")
            from 服务.数据采集.hf_sync import sync_to_hf
            if sync_to_hf():
                print("✅ 周期性整理与同步完成。")
        except Exception as e:
            print(f"❌ 周期性整理与同步失败: {e}")
        
        # 每 4 小时运行一次
        time.sleep(4 * 3600)

# 启动后台线程
thread_collector = threading.Thread(target=启动后台采集, daemon=True)
thread_collector.start()

thread_sync = threading.Thread(target=周期性整理与同步, daemon=True)
thread_sync.start()

# ==========================================
# 2. UI 逻辑
# ==========================================
def 获取监控数据():
    """从 Supabase 获取 service_status 表的所有数据"""
    if not supabase:
        return pd.DataFrame([{"错误": "未配置 SUPABASE_URL 或 KEY"}])
    
    try:
        response = supabase.table("service_status").select("*").execute()
        data = response.data
        if not data:
            return pd.DataFrame([{"信息": "目前没有服务在运行"}])
        
        # 转换为 DataFrame 方便展示
        df = pd.DataFrame(data)
        
        # --- 新增:从嵌套的 details 字典中提取性能指标 ---
        def extract_metrics(row):
            details = row.get("details", {})
            if isinstance(details, dict):
                row["cpu_percent"] = details.get("cpu_percent", "")
                row["memory_percent"] = details.get("memory_percent", "")
                # 将 details 转换为更易读的字符串,或者只保留除指标外的其他信息
                other_details = {k: v for k, v in details.items() if k not in ["cpu_percent", "memory_percent"]}
                row["details_str"] = str(other_details)
            else:
                row["details_str"] = str(details)
            return row

        df = df.apply(extract_metrics, axis=1)
        
        # 简单处理下时间格式
        if "updated_at" in df.columns:
            df["更新时间"] = pd.to_datetime(df["updated_at"]).dt.strftime('%Y-%m-%d %H:%M:%S')
            df = df.drop(columns=["updated_at"])
        
        # 重命名列名,让高中生也能看懂
        rename_map = {
            "service_name": "服务名称",
            "status": "状态",
            "cpu_percent": "CPU使用率(%)",
            "memory_percent": "内存使用率(%)",
            "details_str": "详细信息"
        }
        # 只保留需要的列
        cols = ["service_name", "status", "details_str", "cpu_percent", "memory_percent", "更新时间"]
        available_cols = [c for c in cols if c in df.columns]
        df = df[available_cols].rename(columns=rename_map)
        return df
    except Exception as e:
        return pd.DataFrame([{"错误": f"获取数据失败: {str(e)}"}])

def 检查配置状态():
    status = {
        "SUPABASE_URL": "✅ 已配置" if os.getenv("SUPABASE_URL") else "❌ 未配置",
        "SUPABASE_KEY": "✅ 已配置" if (os.getenv("SUPABASE_SERVICE_ROLE_KEY") or os.getenv("SUPABASE_ANON_KEY")) else "❌ 未配置",
        "HF_TOKEN": "✅ 已配置" if os.getenv("HF_TOKEN") else "❌ 未配置 (需手动在 Settings -> Secrets 配置)",
    }
    return status

# ==========================================
# 3. 构建 Gradio 界面
# ==========================================
with gr.Blocks(title="Quant_Unified 监控中心", theme=gr.themes.Soft()) as demo:
    gr.Markdown("# 🚀 Quant_Unified 量化系统监控中心")
    
    with gr.Accordion("🛠️ 系统环境检查", open=False):
        conf = 检查配置状态()
        for k, v in conf.items():
            gr.Markdown(f"**{k}**: {v}")
            
    gr.Markdown("实时展示部署在云端的采集服务状态。数据通过 Supabase 同步。")
    gr.Markdown("---") # 添加一个小分隔线触发重启
    
    with gr.Row():
        status_table = gr.DataFrame(
            label="服务状态列表", 
            value=获取监控数据, 
            every=5,
            interactive=False
        ) 
        
    with gr.Row():
        refresh_btn = gr.Button("手动刷新")
        refresh_btn.click(获取监控数据, outputs=status_table)

    gr.Markdown("---")
    gr.Markdown("💡 **提示**:如果状态显示为 'ok',说明后台采集正在正常工作。")

if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=7860)