# -*- coding: utf-8 -*- import gradio as gr import asyncio import threading import json import sys import os import importlib import random from datetime import datetime # ========== 顶栏主题与样式(浅色、中文) ========== theme = gr.themes.Soft() css = """ :root { --radius: 14px; } #topbar { display:flex; align-items:center; justify-content:space-between; padding:12px 16px; background:linear-gradient(90deg,#f8fafc,#eef2ff,#e0f2fe); color:#0f172a; border:1px solid #e5e7eb; border-radius: var(--radius); box-shadow: 0 4px 14px rgba(15,23,42,.06); margin-bottom:12px; width: 100%; box-sizing: border-box; } #topbar h1 { font-size:18px; margin:0; font-weight:800; letter-spacing:.5px } #topbar .right { display:flex; gap:8px; align-items:center } .badge { padding:4px 10px; border-radius:999px; background:#e0f2fe; color:#0369a1; font-weight:700; border:1px solid #bae6fd } .footer-note { opacity:.7; font-size:12px; } """ # ========== 路径修复与导入别名(兼容 from 策略基类 import 策略接口) ========== def _修复导入路径与别名(): root = os.path.dirname(os.path.abspath(__file__)) if root not in sys.path: sys.path.insert(0, root) for d in ['插件_引擎', '插件_交易', '插件_行情', '插件_资产']: p = os.path.join(root, d) if os.path.isdir(p) and p not in sys.path: sys.path.insert(0, p) try: mod = importlib.import_module('插件_交易.策略基类') if '策略基类' not in sys.modules: sys.modules['策略基类'] = mod except Exception: pass _修复导入路径与别名() # ========== 内置“轻量引擎”(后端缺失时自动兜底) ========== class _LiteExchange: def __init__(self): self.净值 = 1.0 self.峰值 = 1.0 self.回撤 = 0.0 def tick(self): drift = 0.0002 noise = random.gauss(0, 0.002) self.净值 = max(0.5, self.净值 * (1 + drift + noise)) self.峰值 = max(self.峰值, self.净值) if self.峰值 > 0: self.回撤 = (self.峰值 - self.净值) / self.峰值 class 轻量自适应策略: def __init__(self, 名称: str, 参数: dict): self.名称 = 名称 self.参数 = 参数 class 轻量引擎: def __init__(self, 记录函数): self._停 = False self.交易所 = _LiteExchange() self._log = 记录函数 self._策略 = {} def 注册策略(self, 策略ID: str, 策略对象, 合约: str, 周期分钟: int): self._策略[策略ID] = {"对象": 策略对象, "合约": 合约, "周期": 周期分钟} self._log(f"[轻量引擎] 已注册策略: {策略ID} -> {合约} / {周期分钟}m") async def 启动(self, 合约列表): self._log("[轻量引擎] 启动(演示模式,无真实行情订阅)") try: i = 0 while not self._停: self.交易所.tick() i += 1 if i % 5 == 0: self._log(f"[轻量引擎] 心跳 | 净值={self.交易所.净值:.4f} 回撤={self.交易所.回撤:.2%}") await asyncio.sleep(2) finally: self._log("[轻量引擎] 已停止") def 停止(self): self._停 = True # ========== 全局状态 ========== 引擎实例 = None 运行线程 = None 状态锁 = threading.Lock() 运行状态 = { "运行中": False, "合约": "", "周期": 0, "净值": 1.0, "回撤": 0.0, "日志": [] } 默认参数 = { "ver": "v1", "adx_thr": 25, "atr_buf_tr": 0.1, "rsi_low": 30, "rsi_high": 70, "mr_stop_atr": 2.0, "trend_logic": "adx", "alloc_mode": "fixed" } def 从参数推断周期及选中(JSON文本: str): try: d = json.loads(JSON文本) except Exception: d = {} tf = None for k in ('tf', 'period', '周期', 'bar', 'bar_min', 'timeframe', 'k', 'k_min'): v = d.get(k) if isinstance(v, (int, float)) and int(v) > 0: tf = int(v); break if isinstance(v, str) and v.strip().isdigit(): tf = int(v.strip()); break if not tf: tf = 60 base = [30, 60, 120, 240] if tf not in base: base.append(tf) base = sorted(set(base)) return [str(x) for x in base], str(tf) def 参数变更更新周期(参数JSON文本: str): choices, value = 从参数推断周期及选中(参数JSON文本) import gradio as gr return gr.update(choices=choices, value=value) def _记录(msg: str): with 状态锁: 运行状态["日志"].append(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") if len(运行状态["日志"]) > 500: 运行状态["日志"] = 运行状态["日志"][-300:] # ========== 真实/轻量 引擎跑法 ========== async def _用真实引擎运行(合约: str, 周期: int, 参数: dict): global 引擎实例 _修复导入路径与别名() from 插件_引擎.多策略引擎 import 多策略引擎 from 插件_交易.策略_自适应 import 自适应策略 引擎实例 = 多策略引擎() 策略 = 自适应策略(f"{合约}_{周期}分", 参数) 引擎实例.注册策略(f"{合约}_{周期}", 策略, 合约, int(周期)) _记录(f"引擎启动:{合约} / {周期} 分(真实引擎)") await 引擎实例.启动([合约]) async def _用轻量引擎运行(合约: str, 周期: int, 参数: dict): global 引擎实例 引擎实例 = 轻量引擎(_记录) 策略 = 轻量自适应策略(f"{合约}_{周期}分", 参数) 引擎实例.注册策略(f"{合约}_{周期}", 策略, 合约, int(周期)) _记录(f"引擎启动:{合约} / {周期} 分(轻量引擎:演示模式)") await 引擎实例.启动([合约]) def _后台任务(合约: str, 周期: int, 参数: dict): global 引擎实例 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: async def 主循环(): try: await _用真实引擎运行(合约, 周期, 参数) except Exception as e: _记录(f"真实引擎启动失败,切换至内置轻量引擎。原因:{e}") try: await _用轻量引擎运行(合约, 周期, 参数) except Exception as e2: _记录(f"轻量引擎也启动失败:{e2}") finally: with 状态锁: 运行状态["运行中"] = False _记录("引擎结束") loop.run_until_complete(主循环()) finally: try: loop.run_until_complete(asyncio.sleep(0)) except Exception: pass loop.close() # ========== 业务函数(与UI绑定) ========== def 启动模拟(合约: str, 周期: str, 参数json: str): global 运行线程 with 状态锁: if 运行状态["运行中"]: return f"已有任务运行中:{运行状态['合约']} / {运行状态['周期']}分", "\n".join(运行状态["日志"][-100:]) if 参数json.strip(): try: 参数 = json.loads(参数json) except Exception as e: return f"参数JSON格式错误:{e}", "\n".join(运行状态["日志"][-100:]) else: 参数 = 默认参数 try: p = int(周期) except: return "周期必须是数字(30/60/120/240)", "\n".join(运行状态["日志"][-100:]) with 状态锁: 运行状态["运行中"] = True 运行状态["合约"] = 合约 运行状态["周期"] = p 运行状态["净值"] = 1.0 运行状态["回撤"] = 0.0 运行状态["日志"].clear() _记录(f"准备启动:合约={合约} 周期={p} 参数={参数}") 运行线程 = threading.Thread(target=_后台任务, args=(合约, p, 参数), daemon=True) 运行线程.start() return "模拟启动成功", "\n".join(运行状态["日志"][-100:]) def 停止模拟(): global 引擎实例 if 引擎实例 is None: _记录("当前没有运行中的任务") with 状态锁: 运行状态["运行中"] = False return "当前没有运行中的任务" try: 引擎实例.停止() _记录("已请求停止(引擎将不再处理新信号/演示将退出)") return "已发送停止信号" except Exception as e: _记录(f"停止失败:{e}") return f"停止失败:{e}" def 获取状态(): with 状态锁: try: if 运行状态["运行中"] and 引擎实例 is not None and hasattr(引擎实例, '交易所') and 引擎实例.交易所 is not None: nv = float(getattr(引擎实例.交易所, "净值", 1.0) or 1.0) dd = float(getattr(引擎实例.交易所, "回撤", 0.0) or 0.0) 运行状态["净值"] = nv 运行状态["回撤"] = dd except Exception as e: _记录(f"读取状态失败:{e}") 状态文本 = f"运行中: {运行状态['运行中']} | 合约: {运行状态['合约']} | 周期: {运行状态['周期']}m | 净值: {运行状态['净值']:.4f} | 回撤: {运行状态['回撤']:.2%}" 日志文本 = "\n".join(运行状态["日志"][-120:]) return 状态文本, 日志文本 # ========== 引入设置面板(来自 配置.py) ========== try: import 配置 as 配置模块 except Exception: 配置模块 = None # ========== Gradio 界面 ========== with gr.Blocks(title="模拟实盘(OKX 公共WS + 多策略引擎)", theme=theme, css=css) as demo: # 给顶栏外层添加 prose / gr-prose 类,使其宽度与 Markdown 一致 gr.HTML( '