jy / app.py
1oscon's picture
Update app.py
f2700f2 verified
# -*- coding: utf-8 -*-
import gradio as gr
import asyncio
import threading
import json
import sys
import os
import importlib
import random
from datetime import datetime
# ========== 顶栏主题与样式(浅色、中文) ==========
theme = gr.themes.Soft()
css = """
:root { --radius: 14px; }
#topbar {
display:flex; align-items:center; justify-content:space-between;
padding:12px 16px;
background:linear-gradient(90deg,#f8fafc,#eef2ff,#e0f2fe);
color:#0f172a;
border:1px solid #e5e7eb;
border-radius: var(--radius);
box-shadow: 0 4px 14px rgba(15,23,42,.06);
margin-bottom:12px;
width: 100%;
box-sizing: border-box;
}
#topbar h1 { font-size:18px; margin:0; font-weight:800; letter-spacing:.5px }
#topbar .right { display:flex; gap:8px; align-items:center }
.badge { padding:4px 10px; border-radius:999px; background:#e0f2fe; color:#0369a1; font-weight:700; border:1px solid #bae6fd }
.footer-note { opacity:.7; font-size:12px; }
"""
# ========== 路径修复与导入别名(兼容 from 策略基类 import 策略接口) ==========
def _修复导入路径与别名():
root = os.path.dirname(os.path.abspath(__file__))
if root not in sys.path:
sys.path.insert(0, root)
for d in ['插件_引擎', '插件_交易', '插件_行情', '插件_资产']:
p = os.path.join(root, d)
if os.path.isdir(p) and p not in sys.path:
sys.path.insert(0, p)
try:
mod = importlib.import_module('插件_交易.策略基类')
if '策略基类' not in sys.modules:
sys.modules['策略基类'] = mod
except Exception:
pass
_修复导入路径与别名()
# ========== 内置“轻量引擎”(后端缺失时自动兜底) ==========
class _LiteExchange:
def __init__(self):
self.净值 = 1.0
self.峰值 = 1.0
self.回撤 = 0.0
def tick(self):
drift = 0.0002
noise = random.gauss(0, 0.002)
self.净值 = max(0.5, self.净值 * (1 + drift + noise))
self.峰值 = max(self.峰值, self.净值)
if self.峰值 > 0:
self.回撤 = (self.峰值 - self.净值) / self.峰值
class 轻量自适应策略:
def __init__(self, 名称: str, 参数: dict):
self.名称 = 名称
self.参数 = 参数
class 轻量引擎:
def __init__(self, 记录函数):
self._停 = False
self.交易所 = _LiteExchange()
self._log = 记录函数
self._策略 = {}
def 注册策略(self, 策略ID: str, 策略对象, 合约: str, 周期分钟: int):
self._策略[策略ID] = {"对象": 策略对象, "合约": 合约, "周期": 周期分钟}
self._log(f"[轻量引擎] 已注册策略: {策略ID} -> {合约} / {周期分钟}m")
async def 启动(self, 合约列表):
self._log("[轻量引擎] 启动(演示模式,无真实行情订阅)")
try:
i = 0
while not self._停:
self.交易所.tick()
i += 1
if i % 5 == 0:
self._log(f"[轻量引擎] 心跳 | 净值={self.交易所.净值:.4f} 回撤={self.交易所.回撤:.2%}")
await asyncio.sleep(2)
finally:
self._log("[轻量引擎] 已停止")
def 停止(self):
self._停 = True
# ========== 全局状态 ==========
引擎实例 = None
运行线程 = None
状态锁 = threading.Lock()
运行状态 = {
"运行中": False,
"合约": "",
"周期": 0,
"净值": 1.0,
"回撤": 0.0,
"日志": []
}
默认参数 = {
"ver": "v1",
"adx_thr": 25,
"atr_buf_tr": 0.1,
"rsi_low": 30,
"rsi_high": 70,
"mr_stop_atr": 2.0,
"trend_logic": "adx",
"alloc_mode": "fixed"
}
def 从参数推断周期及选中(JSON文本: str):
try:
d = json.loads(JSON文本)
except Exception:
d = {}
tf = None
for k in ('tf', 'period', '周期', 'bar', 'bar_min', 'timeframe', 'k', 'k_min'):
v = d.get(k)
if isinstance(v, (int, float)) and int(v) > 0:
tf = int(v); break
if isinstance(v, str) and v.strip().isdigit():
tf = int(v.strip()); break
if not tf:
tf = 60
base = [30, 60, 120, 240]
if tf not in base:
base.append(tf)
base = sorted(set(base))
return [str(x) for x in base], str(tf)
def 参数变更更新周期(参数JSON文本: str):
choices, value = 从参数推断周期及选中(参数JSON文本)
import gradio as gr
return gr.update(choices=choices, value=value)
def _记录(msg: str):
with 状态锁:
运行状态["日志"].append(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}")
if len(运行状态["日志"]) > 500:
运行状态["日志"] = 运行状态["日志"][-300:]
# ========== 真实/轻量 引擎跑法 ==========
async def _用真实引擎运行(合约: str, 周期: int, 参数: dict):
global 引擎实例
_修复导入路径与别名()
from 插件_引擎.多策略引擎 import 多策略引擎
from 插件_交易.策略_自适应 import 自适应策略
引擎实例 = 多策略引擎()
策略 = 自适应策略(f"{合约}_{周期}分", 参数)
引擎实例.注册策略(f"{合约}_{周期}", 策略, 合约, int(周期))
_记录(f"引擎启动:{合约} / {周期} 分(真实引擎)")
await 引擎实例.启动([合约])
async def _用轻量引擎运行(合约: str, 周期: int, 参数: dict):
global 引擎实例
引擎实例 = 轻量引擎(_记录)
策略 = 轻量自适应策略(f"{合约}_{周期}分", 参数)
引擎实例.注册策略(f"{合约}_{周期}", 策略, 合约, int(周期))
_记录(f"引擎启动:{合约} / {周期} 分(轻量引擎:演示模式)")
await 引擎实例.启动([合约])
def _后台任务(合约: str, 周期: int, 参数: dict):
global 引擎实例
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
async def 主循环():
try:
await _用真实引擎运行(合约, 周期, 参数)
except Exception as e:
_记录(f"真实引擎启动失败,切换至内置轻量引擎。原因:{e}")
try:
await _用轻量引擎运行(合约, 周期, 参数)
except Exception as e2:
_记录(f"轻量引擎也启动失败:{e2}")
finally:
with 状态锁:
运行状态["运行中"] = False
_记录("引擎结束")
loop.run_until_complete(主循环())
finally:
try:
loop.run_until_complete(asyncio.sleep(0))
except Exception:
pass
loop.close()
# ========== 业务函数(与UI绑定) ==========
def 启动模拟(合约: str, 周期: str, 参数json: str):
global 运行线程
with 状态锁:
if 运行状态["运行中"]:
return f"已有任务运行中:{运行状态['合约']} / {运行状态['周期']}分", "\n".join(运行状态["日志"][-100:])
if 参数json.strip():
try:
参数 = json.loads(参数json)
except Exception as e:
return f"参数JSON格式错误:{e}", "\n".join(运行状态["日志"][-100:])
else:
参数 = 默认参数
try:
p = int(周期)
except:
return "周期必须是数字(30/60/120/240)", "\n".join(运行状态["日志"][-100:])
with 状态锁:
运行状态["运行中"] = True
运行状态["合约"] = 合约
运行状态["周期"] = p
运行状态["净值"] = 1.0
运行状态["回撤"] = 0.0
运行状态["日志"].clear()
_记录(f"准备启动:合约={合约} 周期={p} 参数={参数}")
运行线程 = threading.Thread(target=_后台任务, args=(合约, p, 参数), daemon=True)
运行线程.start()
return "模拟启动成功", "\n".join(运行状态["日志"][-100:])
def 停止模拟():
global 引擎实例
if 引擎实例 is None:
_记录("当前没有运行中的任务")
with 状态锁:
运行状态["运行中"] = False
return "当前没有运行中的任务"
try:
引擎实例.停止()
_记录("已请求停止(引擎将不再处理新信号/演示将退出)")
return "已发送停止信号"
except Exception as e:
_记录(f"停止失败:{e}")
return f"停止失败:{e}"
def 获取状态():
with 状态锁:
try:
if 运行状态["运行中"] and 引擎实例 is not None and hasattr(引擎实例, '交易所') and 引擎实例.交易所 is not None:
nv = float(getattr(引擎实例.交易所, "净值", 1.0) or 1.0)
dd = float(getattr(引擎实例.交易所, "回撤", 0.0) or 0.0)
运行状态["净值"] = nv
运行状态["回撤"] = dd
except Exception as e:
_记录(f"读取状态失败:{e}")
状态文本 = f"运行中: {运行状态['运行中']} | 合约: {运行状态['合约']} | 周期: {运行状态['周期']}m | 净值: {运行状态['净值']:.4f} | 回撤: {运行状态['回撤']:.2%}"
日志文本 = "\n".join(运行状态["日志"][-120:])
return 状态文本, 日志文本
# ========== 引入设置面板(来自 配置.py) ==========
try:
import 配置 as 配置模块
except Exception:
配置模块 = None
# ========== Gradio 界面 ==========
with gr.Blocks(title="模拟实盘(OKX 公共WS + 多策略引擎)", theme=theme, css=css) as demo:
# 给顶栏外层添加 prose / gr-prose 类,使其宽度与 Markdown 一致
gr.HTML(
'<div id="topbar"><h1>📈 模拟实盘控制台</h1><div class="right"><span class="badge">中文界面</span></div></div>',
elem_id="topbar_wrap",
elem_classes=["prose", "gr-prose"]
)
gr.Markdown("提示:点击“⚙️ 设置”进入配置页(上传CSV、参数加载/保存)。若后端模块缺失将自动进入演示引擎。")
with gr.Row():
with gr.Column(scale=7):
with gr.Group(visible=True) as 运行面板:
with gr.Row():
with gr.Column(scale=1):
合约输入 = gr.Textbox(label="合约ID", value="UXLINK-USDT-SWAP", placeholder="例:BTC-USDT-SWAP / ETH-USDT-SWAP")
周期选择 = gr.Radio(label="周期(分钟)", choices=["30", "60", "120", "240"], value="60")
参数输入 = gr.Textbox(label="策略参数(JSON)", value=json.dumps(默认参数, ensure_ascii=False, indent=2), lines=12)
with gr.Row():
启动按钮 = gr.Button("🚀 启动模拟", variant="primary")
停止按钮 = gr.Button("⏹️ 停止模拟", variant="stop")
刷新按钮 = gr.Button("🔄 刷新状态")
设置按钮 = gr.Button("⚙️ 设置", variant="secondary")
with gr.Column(scale=1):
状态显示 = gr.Textbox(label="运行状态", lines=2, interactive=False)
日志显示 = gr.Textbox(label="实时日志(最近120行)", lines=22, interactive=False)
with gr.Group(visible=False) as 设置面板:
返回按钮 = gr.Button("← 返回运行页", variant="secondary")
gr.Markdown("## 设置")
if 配置模块 is not None:
构件 = 配置模块.构建设置面板(容器=设置面板, 默认参数=默认参数, 记录函数=_记录)
if isinstance(构件, dict):
# 参数回填
if "参数编辑框" in 构件 and ("复制按钮" in 构件 or "复制参数按钮" in 构件):
回填参数按钮 = 构件.get("复制按钮") or 构件.get("复制参数按钮")
回填参数按钮.click(lambda s: s, inputs=构件["参数编辑框"], outputs=参数输入)
# 合约回填
if "回填合约按钮" in 构件 and "合约ID框" in 构件:
构件["回填合约按钮"].click(lambda s: s.strip(), inputs=构件["合约ID框"], outputs=合约输入)
else:
gr.Markdown("未找到 配置.py,无法加载设置面板。")
with gr.Column(scale=5):
gr.Markdown("### 使用说明")
gr.Markdown("- 在设置页上传CSV并完成列映射与合约登记\n- 将参数与合约一键回填到运行页\n- 启动后可在右侧查看状态和日志\n- 空闲时 Space 会休眠,策略不会持续运行(演示环境)")
gr.Markdown("<div class='footer-note'>提示:若需长期运行,请在自有服务器部署引擎,Space 仅作前端/演示。</div>")
# 页面切换
设置按钮.click(lambda: (gr.update(visible=False), gr.update(visible=True)), outputs=[运行面板, 设置面板])
返回按钮.click(lambda: (gr.update(visible=True), gr.update(visible=False)), outputs=[运行面板, 设置面板])
# 事件绑定(运行控制)
启动按钮.click(fn=启动模拟, inputs=[合约输入, 周期选择, 参数输入], outputs=[状态显示, 日志显示])
停止按钮.click(fn=停止模拟, outputs=状态显示)
刷新按钮.click(fn=获取状态, outputs=[状态显示, 日志显示])
# 参数JSON变化时,自动更新周期选项与选中值
参数输入.change(fn=参数变更更新周期, inputs=参数输入, outputs=周期选择)
# 初始和定时刷新
demo.load(fn=获取状态, outputs=[状态显示, 日志显示])
# 页面加载时按参数JSON回填周期
demo.load(fn=lambda s: 参数变更更新周期(s), inputs=参数输入, outputs=周期选择)
timer = gr.Timer(2)
timer.tick(fn=获取状态, outputs=[状态显示, 日志显示])
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)