Spaces:
Paused
Paused
| import streamlit as st | |
| import requests | |
| import time | |
| import json | |
| import os | |
| import copy | |
| import threading | |
| import hmac | |
| import hashlib | |
| import urllib3 | |
| from datetime import datetime, timedelta, timezone, time as dt_time | |
| from collections import deque, defaultdict | |
| from dotenv import load_dotenv | |
| # --- 0. 基础配置 --- | |
| st.set_page_config(page_title="虚拟货币价格通知", page_icon="🚀", layout="wide") | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| try: | |
| from streamlit_autorefresh import st_autorefresh | |
| except ImportError: | |
| st_autorefresh = None | |
| # 可选:实时强平依赖 websocket-client,缺失则自动禁用强平监控 | |
| try: | |
| import websocket as ws_client # websocket-client | |
| except ImportError: | |
| ws_client = None | |
| # --- 1. 环境变量与常量 --- | |
| load_dotenv() | |
| # 企业微信机器人 Webhook | |
| WEWORK_BOT_WEBHOOK = os.getenv("WEWORK_BOT_WEBHOOK") | |
| # 币安 API Key(公共行情接口无需密钥,仅在需要私有接口时使用) | |
| BINANCE_API_KEY = os.getenv("BINANCE_API_KEY") | |
| BINANCE_API_SECRET = os.getenv("BINANCE_API_SECRET") | |
| # 行情接口基址(部分地区需要换成 data-api.binance.vision 等镜像) | |
| SPOT_BASE = os.getenv("BINANCE_SPOT_BASE", "https://api.binance.com") | |
| FAPI_BASE = os.getenv("BINANCE_FAPI_BASE", "https://fapi.binance.com") | |
| FSTREAM_BASE = os.getenv("BINANCE_FSTREAM_BASE", "wss://fstream.binance.com/ws") | |
| # 配置持久化文件 | |
| CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| CONFIG_FILE = os.path.join(CURRENT_DIR, "crypto_config.json") | |
| REQ_HEADERS = {"User-Agent": "Mozilla/5.0"} | |
| # K线周期对应的毫秒数(用于分页拉取长周期历史) | |
| INTERVAL_MS = { | |
| "1m": 60_000, "5m": 300_000, "15m": 900_000, | |
| "1h": 3_600_000, "1d": 86_400_000, | |
| } | |
| # --- 2. 默认阈值配置 --- | |
| # 价格变动:每个时间窗口 [关注, 重要, 紧急](百分比,绝对值) | |
| def _btc_defaults(): | |
| return { | |
| "symbol": "BTCUSDT", | |
| "futures_symbol": "BTCUSDT", | |
| "enable_futures": True, | |
| # 个人关键价位(0 表示未设置,不监控) | |
| "cost_price": 0.0, | |
| "target_price": 0.0, | |
| "stop_price": 0.0, | |
| # 快速涨跌触发阈值(百分比绝对值),方向由涨/跌自动判断 | |
| "price_change": { | |
| "1m": 0.35, | |
| "5m": 0.80, | |
| "15m": 1.50, | |
| "1h": 2.50, | |
| }, | |
| # 关键价位 | |
| "near_cost_pct": 0.5, | |
| "near_key_pct": 0.4, | |
| # 突破 | |
| "break_24h_pct": 0.2, "break_24h_hold_sec": 120, | |
| "break_7d_pct": 0.5, | |
| # 成交量 | |
| "vol_anomaly_mult": 2.5, "vol_severe_mult": 5.0, | |
| # 振幅 | |
| "amp_5m": 1.0, "amp_15m": 1.8, "amp_1h": 3.0, | |
| # 大额成交 | |
| "big_trade_usdt": 1_000_000, "cum_trade_5m_usdt": 10_000_000, | |
| # 盘口 | |
| "spread_pct": 0.03, "spread_severe_pct": 0.08, | |
| "imbalance_depth_pct": 0.5, "imbalance_ratio": 2.5, | |
| "wall_depth_pct": 1.0, "wall_mult": 3.0, "wall_vanish_pct": 60.0, | |
| # 技术指标 | |
| "ma_deviation_pct": 2.5, | |
| "consecutive_klines": 5, | |
| "rsi_high": 75.0, "rsi_low": 25.0, | |
| # 合约(enable_futures 为 False 时整体跳过) | |
| "funding_high": 0.03, "funding_hot": 0.07, "funding_neg": -0.03, | |
| "oi_15m": 2.0, "oi_1h": 5.0, "oi_4h": 10.0, | |
| "liq_5m_usdt": 5_000_000, "liq_severe_usdt": 20_000_000, | |
| } | |
| def _ton_defaults(): | |
| d = _btc_defaults() | |
| d.update({ | |
| "symbol": "TONUSDT", | |
| "futures_symbol": "TONUSDT", | |
| "enable_futures": False, # TONUSDT 合约已下线,待 GRAMUSDT 重新上线后启用 | |
| "price_change": { | |
| "1m": 0.80, | |
| "5m": 1.80, | |
| "15m": 3.00, | |
| "1h": 5.00, | |
| }, | |
| "near_cost_pct": 1.0, | |
| "near_key_pct": 1.0, | |
| "break_24h_pct": 0.5, "break_24h_hold_sec": 180, | |
| "break_7d_pct": 1.2, | |
| "vol_anomaly_mult": 3.5, "vol_severe_mult": 8.0, | |
| "amp_5m": 2.5, "amp_15m": 5.0, "amp_1h": 8.0, | |
| "big_trade_usdt": 100_000, "cum_trade_5m_usdt": 500_000, | |
| "spread_pct": 0.12, "spread_severe_pct": 0.35, | |
| "imbalance_depth_pct": 1.0, "imbalance_ratio": 3.0, | |
| "wall_depth_pct": 2.0, "wall_mult": 4.0, "wall_vanish_pct": 70.0, | |
| "ma_deviation_pct": 5.0, | |
| "consecutive_klines": 4, | |
| "rsi_high": 80.0, "rsi_low": 20.0, | |
| "funding_high": 0.05, "funding_hot": 0.12, "funding_neg": -0.05, | |
| "oi_15m": 4.0, "oi_1h": 8.0, "oi_4h": 15.0, | |
| "liq_5m_usdt": 300_000, "liq_severe_usdt": 1_000_000, | |
| }) | |
| return d | |
| DEFAULT_CONFIG = { | |
| "coins": { | |
| "BTC": _btc_defaults(), | |
| "TON": _ton_defaults(), | |
| }, | |
| "global": { | |
| "poll_interval_sec": 30, | |
| "daily_report_hour": 9, | |
| "down_priority": True, | |
| "alert_sensitivity": 1.0, | |
| }, | |
| } | |
| # 各类告警冷却时间(分钟),避免重复刷屏 | |
| # 微观结构类(盘口/大单/价差/振幅)天然高频,冷却拉长,突出关键事件 | |
| COOLDOWN_MIN = { | |
| "price_change": 10, "near_cost": 30, "near_key": 30, | |
| "break_24h": 30, "break_7d": 120, "volume": 20, "amplitude": 20, | |
| "big_trade": 15, "cum_trade": 20, "spread": 30, "imbalance": 30, | |
| "wall": 90, "wall_vanish": 30, "ma_deviation": 30, | |
| "consecutive": 60, "rsi": 60, "funding": 120, "oi": 30, "liquidation": 10, | |
| } | |
| # --- 3. 配置持久化 --- | |
| def _deep_merge(base, override): | |
| """用 override 覆盖 base,缺失项以 base 补全(保证新增阈值字段有默认值)""" | |
| result = copy.deepcopy(base) | |
| for k, v in (override or {}).items(): | |
| if isinstance(v, dict) and isinstance(result.get(k), dict): | |
| result[k] = _deep_merge(result[k], v) | |
| else: | |
| result[k] = v | |
| return result | |
| def load_config(): | |
| """读取配置文件,并以默认值补全缺失字段""" | |
| cfg = copy.deepcopy(DEFAULT_CONFIG) | |
| if os.path.exists(CONFIG_FILE): | |
| try: | |
| with open(CONFIG_FILE, "r", encoding="utf-8") as f: | |
| saved = json.load(f) | |
| # global 直接合并 | |
| cfg["global"] = _deep_merge(cfg["global"], saved.get("global", {})) | |
| # coins:保留用户自定义币种,并以 BTC 默认结构补全字段 | |
| saved_coins = saved.get("coins", {}) | |
| merged_coins = {} | |
| for name, coin_cfg in saved_coins.items(): | |
| template = _btc_defaults() | |
| merged_coins[name] = _deep_merge(template, coin_cfg) | |
| if merged_coins: | |
| cfg["coins"] = merged_coins | |
| except Exception as e: | |
| print(f"⚠️ 配置读取失败,使用默认值: {e}") | |
| return cfg | |
| def save_config(cfg): | |
| try: | |
| with open(CONFIG_FILE, "w", encoding="utf-8") as f: | |
| json.dump(cfg, f, ensure_ascii=False, indent=2) | |
| return True | |
| except Exception as e: | |
| print(f"❌ 配置保存失败: {e}") | |
| return False | |
| # --- 4. 工具函数 --- | |
| def get_beijing_now(): | |
| utc_now = datetime.now(timezone.utc) | |
| return utc_now.astimezone(timezone(timedelta(hours=8))).replace(tzinfo=None) | |
| def fmt_price(p): | |
| """根据量级自适应小数位""" | |
| try: | |
| p = float(p) | |
| except (TypeError, ValueError): | |
| return str(p) | |
| if p >= 100: | |
| return f"{p:,.2f}" | |
| if p >= 1: | |
| return f"{p:.4f}" | |
| return f"{p:.6f}" | |
| def fmt_usdt(v): | |
| """金额转人类可读(万/亿)""" | |
| try: | |
| v = float(v) | |
| except (TypeError, ValueError): | |
| return str(v) | |
| if v >= 1e8: | |
| return f"{v / 1e8:.2f}亿USDT" | |
| if v >= 1e4: | |
| return f"{v / 1e4:.2f}万USDT" | |
| return f"{v:.0f}USDT" | |
| def compute_rsi(closes, period=14): | |
| """标准 RSI""" | |
| if len(closes) < period + 1: | |
| return None | |
| gains, losses = 0.0, 0.0 | |
| for i in range(1, period + 1): | |
| diff = closes[i] - closes[i - 1] | |
| if diff >= 0: | |
| gains += diff | |
| else: | |
| losses -= diff | |
| avg_gain = gains / period | |
| avg_loss = losses / period | |
| for i in range(period + 1, len(closes)): | |
| diff = closes[i] - closes[i - 1] | |
| gain = diff if diff > 0 else 0.0 | |
| loss = -diff if diff < 0 else 0.0 | |
| avg_gain = (avg_gain * (period - 1) + gain) / period | |
| avg_loss = (avg_loss * (period - 1) + loss) / period | |
| if avg_loss == 0: | |
| return 100.0 | |
| rs = avg_gain / avg_loss | |
| return 100.0 - (100.0 / (1.0 + rs)) | |
| # --- 5. 企业微信机器人推送 --- | |
| class WeWorkBotPusher: | |
| def __init__(self, webhook_url): | |
| self.webhook_url = webhook_url | |
| def send_text(self, content): | |
| if not self.webhook_url: | |
| print("⚠️ 未配置 WEWORK_BOT_WEBHOOK,消息未发送:\n" + content) | |
| return False | |
| try: | |
| resp = requests.post( | |
| self.webhook_url, | |
| json={"msgtype": "text", "text": {"content": content}}, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=10, | |
| ) | |
| result = resp.json() | |
| if result.get("errcode") == 0: | |
| return True | |
| print(f"❌ 企业微信机器人发送失败: {result}") | |
| return False | |
| except Exception as e: | |
| print(f"❌ 企业微信机器人发送异常: {e}") | |
| return False | |
| # --- 6. 币安行情接口 --- | |
| class BinanceAPI: | |
| def __init__(self, logger): | |
| self.logger = logger | |
| self.session = requests.Session() | |
| self.session.headers.update(REQ_HEADERS) | |
| if BINANCE_API_KEY: | |
| self.session.headers.update({"X-MBX-APIKEY": BINANCE_API_KEY}) | |
| def _get(self, base, path, params=None, timeout=10): | |
| try: | |
| r = self.session.get(base + path, params=params, timeout=timeout) | |
| if r.status_code != 200: | |
| self.logger(f"⚠️ 接口 {path} 状态码 {r.status_code}") | |
| return None | |
| return r.json() | |
| except Exception as e: | |
| self.logger(f"⚠️ 接口 {path} 异常: {e}") | |
| return None | |
| # 现货行情 | |
| def ticker_24h(self, symbol): | |
| return self._get(SPOT_BASE, "/api/v3/ticker/24hr", {"symbol": symbol}) | |
| def klines(self, symbol, interval, limit): | |
| return self._get(SPOT_BASE, "/api/v3/klines", | |
| {"symbol": symbol, "interval": interval, "limit": limit}) | |
| def klines_history(self, symbol, interval, days, max_calls=40): | |
| """分页拉取约 days 天的历史K线(用于稳定的长周期校准)""" | |
| ms = INTERVAL_MS.get(interval) | |
| if not ms: | |
| return self.klines(symbol, interval, 1000) | |
| end = int(time.time() * 1000) | |
| cursor = end - int(days * 86_400_000) | |
| out, calls = [], 0 | |
| while cursor < end and calls < max_calls: | |
| data = self._get(SPOT_BASE, "/api/v3/klines", | |
| {"symbol": symbol, "interval": interval, | |
| "startTime": cursor, "limit": 1000}) | |
| calls += 1 | |
| if not data: | |
| break | |
| out.extend(data) | |
| nxt = data[-1][0] + ms | |
| if nxt <= cursor: | |
| break | |
| cursor = nxt | |
| if len(data) < 1000: | |
| break | |
| return out | |
| def depth(self, symbol, limit=500): | |
| return self._get(SPOT_BASE, "/api/v3/depth", {"symbol": symbol, "limit": limit}) | |
| def agg_trades(self, symbol, limit=1000): | |
| return self._get(SPOT_BASE, "/api/v3/aggTrades", {"symbol": symbol, "limit": limit}) | |
| # 合约行情 | |
| def premium_index(self, symbol): | |
| return self._get(FAPI_BASE, "/fapi/v1/premiumIndex", {"symbol": symbol}) | |
| def open_interest(self, symbol): | |
| return self._get(FAPI_BASE, "/fapi/v1/openInterest", {"symbol": symbol}) | |
| # --- 7. 实时强平监控(websocket,可选)--- | |
| class LiquidationTracker: | |
| """ | |
| 订阅币安合约 !forceOrder@arr 全市场强平流,按 symbol 维护近 5 分钟滚动金额。 | |
| websocket-client 未安装时整体禁用。 | |
| """ | |
| def __init__(self, symbols, logger): | |
| self.symbols = set(symbols) | |
| self.logger = logger | |
| self.enabled = ws_client is not None and len(self.symbols) > 0 | |
| # symbol -> deque[(ts, notional)] | |
| self.events = defaultdict(lambda: deque(maxlen=5000)) | |
| self.lock = threading.Lock() | |
| if self.enabled: | |
| threading.Thread(target=self._run, daemon=True).start() | |
| def update_symbols(self, symbols): | |
| self.symbols = set(symbols) | |
| def _on_message(self, _ws, message): | |
| try: | |
| data = json.loads(message) | |
| o = data.get("o", {}) | |
| sym = o.get("s") | |
| if sym not in self.symbols: | |
| return | |
| price = float(o.get("ap") or o.get("p") or 0) | |
| qty = float(o.get("q") or 0) | |
| notional = price * qty | |
| with self.lock: | |
| self.events[sym].append((time.time(), notional)) | |
| except Exception: | |
| pass | |
| def _run(self): | |
| url = f"{FSTREAM_BASE}/!forceOrder@arr" | |
| while True: | |
| try: | |
| self.logger("🔌 连接强平 websocket...") | |
| app = ws_client.WebSocketApp( | |
| url, | |
| on_message=self._on_message, | |
| on_error=lambda _w, e: self.logger(f"⚠️ 强平流错误: {e}"), | |
| ) | |
| app.run_forever(ping_interval=180, ping_timeout=10) | |
| except Exception as e: | |
| self.logger(f"⚠️ 强平流断开,5秒后重连: {e}") | |
| time.sleep(5) | |
| def get_5m_total(self, symbol): | |
| if not self.enabled: | |
| return None | |
| cutoff = time.time() - 300 | |
| with self.lock: | |
| dq = self.events.get(symbol) | |
| if dq is None: | |
| return 0.0 | |
| return sum(n for ts, n in dq if ts >= cutoff) | |
| # --- 8. 告警冷却管理 --- | |
| class AlertManager: | |
| def __init__(self, pusher, logger, stats): | |
| self.pusher = pusher | |
| self.logger = logger | |
| self.stats = stats | |
| self.last_sent = {} # key -> timestamp | |
| def emit(self, alerts): | |
| """alerts: list[dict(category,key,severity,sev_emoji,text,direction)]""" | |
| # 下跌优先:跌 排前面先发 | |
| alerts = sorted(alerts, key=lambda a: (a.get("direction") != "跌",)) | |
| now = time.time() | |
| for a in alerts: | |
| key = a["key"] | |
| cd = COOLDOWN_MIN.get(a["category"], 5) * 60 | |
| if now - self.last_sent.get(key, 0) < cd: | |
| continue | |
| ok = self.pusher.send_text(a["text"]) | |
| self.last_sent[key] = now | |
| self.stats["alerts"] += 1 | |
| if not ok: | |
| self.stats["notify_fails"] += 1 | |
| self.logger(f"{a['sev_emoji']} 推送[{a['category']}] {a.get('title', key)}") | |
| # 方向信号 -> emoji / 文案:🟢涨 / 🔴跌 / 🟡值得关注 | |
| SIGNAL_EMOJI = {"涨": "🟢", "跌": "🔴", "关注": "🟡"} | |
| SIGNAL_LABEL = {"涨": "涨", "跌": "跌", "关注": "值得关注"} | |
| def _floats(klines, idx): | |
| return [float(k[idx]) for k in klines] | |
| def _percentile(sorted_vals, p): | |
| """线性插值分位数;sorted_vals 必须已升序""" | |
| if not sorted_vals: | |
| return None | |
| if len(sorted_vals) == 1: | |
| return sorted_vals[0] | |
| k = (len(sorted_vals) - 1) * p / 100.0 | |
| f = int(k) | |
| c = min(f + 1, len(sorted_vals) - 1) | |
| if f == c: | |
| return sorted_vals[f] | |
| return sorted_vals[f] * (c - k) + sorted_vals[c] * (k - f) | |
| # 校准目标:各事件“理想的每日触发次数”(再乘以全局灵敏度) | |
| # 数值越小 -> 阈值越高 -> 通知越少(只剩关键转折点) | |
| TARGET_PER_DAY = { | |
| "1m": 3.0, "5m": 3.0, "15m": 2.0, "1h": 1.5, | |
| "amp_5m": 2.0, "amp_15m": 1.5, "amp_1h": 1.0, | |
| "vol_anomaly": 3.0, "vol_severe": 0.5, | |
| "ma_dev": 2.0, | |
| } | |
| def _threshold_for_rate(sorted_vals, window_min, target_per_day, floor=0.0): | |
| """ | |
| 给定该周期下的历史样本(升序),返回一个阈值: | |
| 使得历史上“超过该阈值”的次数 ≈ target_per_day 次/天。 | |
| 样本为相邻周期收盘价变动/振幅,互不重叠,故覆盖天数 = n*window/1440。 | |
| """ | |
| n = len(sorted_vals) | |
| if n == 0: | |
| return None | |
| days_span = n * window_min / 1440.0 | |
| exceed = max(1, int(round(target_per_day * days_span))) | |
| idx = min(max(n - exceed, 0), n - 1) | |
| return max(sorted_vals[idx], floor) | |
| def calibrate_thresholds(api, coin_cfg, logger=print, sensitivity=1.0): | |
| """ | |
| 基于约30天历史行情,按“理想每日触发次数”反推阈值,突出关键转折点。 | |
| sensitivity 越大 -> 目标次数越多 -> 阈值越低 -> 通知越多。 | |
| 任一环节失败均跳过,保留原默认值。 | |
| """ | |
| sym = coin_cfg["symbol"] | |
| out = {} | |
| s = max(0.2, float(sensitivity)) | |
| logger(f"🧪 {sym} 拉取约30天历史校准(灵敏度 x{s:g})...") | |
| hist_1m = api.klines_history(sym, "1m", 7) # 1m 噪声平稳,7天已足够稳定 | |
| hist_5m = api.klines_history(sym, "5m", 30) | |
| hist_15m = api.klines_history(sym, "15m", 30) | |
| hist_1h = api.klines_history(sym, "1h", 30) | |
| def cc_returns(kl): | |
| closes = [float(k[4]) for k in kl] | |
| return sorted(abs(closes[i] - closes[i - 1]) / closes[i - 1] * 100 | |
| for i in range(1, len(closes)) if closes[i - 1] > 0) | |
| def amps(kl): | |
| v = [] | |
| for k in kl: | |
| hi, lo = float(k[2]), float(k[3]) | |
| if lo > 0: | |
| v.append((hi - lo) / lo * 100) | |
| return sorted(v) | |
| # 1) 快速涨跌:按每日触发次数目标反推 | |
| pc = {} | |
| src = {"1m": hist_1m, "5m": hist_5m, "15m": hist_15m, "1h": hist_1h} | |
| wmin = {"1m": 1, "5m": 5, "15m": 15, "1h": 60} | |
| for w in ("1m", "5m", "15m", "1h"): | |
| kl = src[w] | |
| if kl and len(kl) > 50: | |
| t = _threshold_for_rate(cc_returns(kl), wmin[w], | |
| TARGET_PER_DAY[w] * s, floor=0.05) | |
| if t: | |
| pc[w] = round(t, 2) | |
| if pc: | |
| base = coin_cfg.get("price_change", {}) | |
| for w in ("1m", "5m", "15m", "1h"): | |
| if w not in pc: | |
| bv = base.get(w, 1.0) | |
| pc[w] = float(bv[0]) if isinstance(bv, list) else float(bv) | |
| out["price_change"] = pc | |
| # 2) 振幅 | |
| for kl, win, key, floor in ((hist_5m, 5, "amp_5m", 0.1), | |
| (hist_15m, 15, "amp_15m", 0.2), | |
| (hist_1h, 60, "amp_1h", 0.3)): | |
| if kl and len(kl) > 50: | |
| t = _threshold_for_rate(amps(kl), win, TARGET_PER_DAY[key] * s, floor) | |
| if t: | |
| out[key] = round(t, 2) | |
| # 3) 成交量倍数 + 5m 累计大额 + 单笔大额:基于 5m 成交额分布 | |
| if hist_5m and len(hist_5m) > 50: | |
| qv = [float(k[7]) for k in hist_5m] | |
| avg5_qvol = sum(qv) / len(qv) | |
| if avg5_qvol > 0: | |
| ratios = sorted(x / avg5_qvol for x in qv) | |
| an = _threshold_for_rate(ratios, 5, TARGET_PER_DAY["vol_anomaly"] * s, 1.5) | |
| sv = _threshold_for_rate(ratios, 5, TARGET_PER_DAY["vol_severe"] * s, | |
| (an or 1.5) + 0.5) | |
| if an: | |
| out["vol_anomaly_mult"] = round(an, 1) | |
| if sv: | |
| out["vol_severe_mult"] = round(max(sv, (an or 1.5) + 0.5), 1) | |
| out["cum_trade_5m_usdt"] = round(avg5_qvol * (an or 2.5)) | |
| # 单笔大额:一笔成交达到约 3 分钟的平均成交额才算“巨单” | |
| avg1m_qvol = avg5_qvol / 5.0 | |
| out["big_trade_usdt"] = round(max(avg1m_qvol * 3.0 / s, 5000)) | |
| # 4) 偏离 MA20:1h 历史偏离 | |
| if hist_1h and len(hist_1h) > 40: | |
| closes = [float(k[4]) for k in hist_1h] | |
| devs = [] | |
| for i in range(20, len(closes)): | |
| ma = sum(closes[i - 20:i]) / 20 | |
| if ma > 0: | |
| devs.append(abs(closes[i] - ma) / ma * 100) | |
| t = _threshold_for_rate(sorted(devs), 60, TARGET_PER_DAY["ma_dev"] * s, 0.5) | |
| if t: | |
| out["ma_deviation_pct"] = round(t, 2) | |
| # 5) 价差:以当前盘口价差为基准放大 | |
| ob = api.depth(sym, 100) | |
| if ob and ob.get("bids") and ob.get("asks"): | |
| bb, ba = float(ob["bids"][0][0]), float(ob["asks"][0][0]) | |
| mid = (bb + ba) / 2 | |
| if mid > 0: | |
| sp = (ba - bb) / mid * 100 | |
| out["spread_pct"] = round(max(sp * 3, 0.02), 3) | |
| out["spread_severe_pct"] = round(max(sp * 6, out["spread_pct"] * 2), 3) | |
| # 6) 强平:按 24h 成交额量级缩放 | |
| t24 = api.ticker_24h(sym) | |
| if t24: | |
| qvol24 = float(t24.get("quoteVolume") or 0) | |
| if qvol24 > 0: | |
| avg5 = qvol24 / 288.0 | |
| out["liq_5m_usdt"] = round(max(avg5 * 0.5, 10000)) | |
| out["liq_severe_usdt"] = round(out["liq_5m_usdt"] * 4) | |
| # 7) 资金费率:历史费率分位(仅合约启用时) | |
| if coin_cfg.get("enable_futures"): | |
| fr = api._get(FAPI_BASE, "/fapi/v1/fundingRate", | |
| {"symbol": coin_cfg["futures_symbol"], "limit": 500}) | |
| if fr: | |
| try: | |
| rates = [float(x["fundingRate"]) * 100 for x in fr] | |
| pos = sorted(r for r in rates if r > 0) | |
| neg = sorted(r for r in rates if r < 0) | |
| if pos: | |
| high = round(max(_percentile(pos, 80), 0.005), 4) | |
| hot = round(max(_percentile(pos, 97), high * 1.5), 4) | |
| out["funding_high"] = high | |
| out["funding_hot"] = hot | |
| if neg: | |
| out["funding_neg"] = round(min(_percentile(neg, 20), -0.005), 4) | |
| except (KeyError, ValueError, TypeError): | |
| pass | |
| logger(f"✅ {sym} 校准完成,覆盖 {len(out)} 组参数") | |
| return out | |
| # --- 9. 单币种评估器 --- | |
| class CoinEvaluator: | |
| WINDOW_BACK = {"1m": 2, "5m": 6, "15m": 16, "1h": 61} | |
| WINDOW_CN = {"1m": "1分钟", "5m": "5分钟", "15m": "15分钟", "1h": "1小时"} | |
| def __init__(self, name, api, liq_tracker, logger): | |
| self.name = name | |
| self.api = api | |
| self.liq = liq_tracker | |
| self.logger = logger | |
| # 状态 | |
| self.break_24h_since = {} # 'high'/'low' -> ts | |
| self.break_7d_alerted = {} | |
| self.prev_walls = {} # 'bid'/'ask' -> notional | |
| self.oi_history = deque(maxlen=2000) # (ts, oi) | |
| self.last_price = None | |
| self.last_funding_pct = None | |
| self.last_rsi = None | |
| def evaluate(self, cfg): | |
| sym = cfg["symbol"] | |
| alerts = [] | |
| ticker = self.api.ticker_24h(sym) | |
| k1 = self.api.klines(sym, "1m", 61) | |
| if not ticker or not k1 or len(k1) < 17: | |
| return alerts, None # 数据不足 | |
| closes = _floats(k1, 4) | |
| highs = _floats(k1, 2) | |
| lows = _floats(k1, 3) | |
| qvols = _floats(k1, 7) # quoteAssetVolume | |
| price = float(ticker.get("lastPrice") or closes[-1]) | |
| high24 = float(ticker.get("highPrice") or 0) | |
| low24 = float(ticker.get("lowPrice") or 0) | |
| qvol24 = float(ticker.get("quoteVolume") or 0) | |
| self.last_price = price | |
| def add(category, key, direction, headline, body): | |
| emoji = SIGNAL_EMOJI[direction] | |
| text = f"{emoji} {self.name} {headline}\n{head}\n{body}" | |
| alerts.append({"category": category, "key": f"{self.name}_{key}", | |
| "direction": direction, "sev_emoji": emoji, | |
| "title": headline, "text": text}) | |
| head = f"【{self.name}】现价 {fmt_price(price)} USDT" | |
| # 1) 快速涨跌(方向决定颜色) | |
| for win, back in self.WINDOW_BACK.items(): | |
| if len(closes) <= back: | |
| continue | |
| then = closes[-back] | |
| if then == 0: | |
| continue | |
| pct = (price - then) / then * 100 | |
| thr = cfg["price_change"][win] | |
| if isinstance(thr, list): | |
| thr = thr[0] | |
| if abs(pct) >= thr: | |
| direction = "涨" if pct >= 0 else "跌" | |
| arrow = "📈" if pct >= 0 else "📉" | |
| add("price_change", f"pc_{win}_{direction}", direction, | |
| f"{self.WINDOW_CN[win]}快速{direction} {arrow}", | |
| f"{self.WINDOW_CN[win]}内{direction}幅 {pct:+.2f}%") | |
| # 2) 关键价位 | |
| if cfg.get("cost_price", 0) > 0: | |
| diff = (price - cfg["cost_price"]) / cfg["cost_price"] * 100 | |
| if abs(diff) <= cfg["near_cost_pct"]: | |
| add("near_cost", "near_cost", "关注", "接近成本价", | |
| f"成本价 {fmt_price(cfg['cost_price'])},当前偏离 {diff:+.2f}%") | |
| for label, pkey in (("目标价", "target_price"), ("止损价", "stop_price")): | |
| kp = cfg.get(pkey, 0) | |
| if kp > 0: | |
| diff = (price - kp) / kp * 100 | |
| if abs(diff) <= cfg["near_key_pct"]: | |
| add("near_key", f"near_{pkey}", "关注", f"接近{label}", | |
| f"{label} {fmt_price(kp)},当前偏离 {diff:+.2f}%") | |
| # 3) 突破 24h 高/低(突破后维持一段时间) | |
| self._check_break_24h(cfg, price, high24, low24, head, add) | |
| # 4) 突破 7 日高/低 | |
| self._check_break_7d(cfg, sym, price, head, add) | |
| # 5) 成交量 | |
| if qvol24 > 0 and len(qvols) >= 5: | |
| cur5 = sum(qvols[-5:]) | |
| avg5 = qvol24 / 288.0 | |
| if avg5 > 0: | |
| ratio = cur5 / avg5 | |
| if ratio >= cfg["vol_severe_mult"]: | |
| add("volume", "vol_severe", "关注", f"严重放量 x{ratio:.1f}", | |
| f"近5分钟成交 {fmt_usdt(cur5)},为24h均量的 {ratio:.1f} 倍") | |
| elif ratio >= cfg["vol_anomaly_mult"]: | |
| add("volume", "vol_anomaly", "关注", f"成交量异常 x{ratio:.1f}", | |
| f"近5分钟成交 {fmt_usdt(cur5)},为24h均量的 {ratio:.1f} 倍") | |
| # 6) 振幅 | |
| for win, n, thr_key in (("5分钟", 5, "amp_5m"), ("15分钟", 15, "amp_15m"), ("1小时", 60, "amp_1h")): | |
| if len(highs) >= n: | |
| hi = max(highs[-n:]) | |
| lo = min(lows[-n:]) | |
| if lo > 0: | |
| amp = (hi - lo) / lo * 100 | |
| if amp >= cfg[thr_key]: | |
| add("amplitude", f"amp_{n}", "关注", f"{win}振幅过大 {amp:.2f}%", | |
| f"{win}高低差 {amp:.2f}%({fmt_price(lo)} ~ {fmt_price(hi)})") | |
| # 7) 大额成交 | |
| self._check_trades(cfg, sym, head, add) | |
| # 8) 盘口(价差 / 失衡 / 买卖墙) | |
| self._check_orderbook(cfg, sym, price, head, add) | |
| # 9) 价格偏离均线 | |
| self._check_ma(cfg, sym, price, head, add) | |
| # 10) 连续 K 线 | |
| self._check_consecutive(cfg, sym, head, add) | |
| # 11) RSI | |
| self._check_rsi(cfg, sym, head, add) | |
| # 12) 合约类 | |
| if cfg.get("enable_futures"): | |
| self._check_futures(cfg, head, add) | |
| snapshot = { | |
| "price": price, "high24": high24, "low24": low24, | |
| "rsi": self.last_rsi, "funding": self.last_funding_pct, | |
| } | |
| return alerts, snapshot | |
| # ---- 子检查 ---- | |
| def _check_break_24h(self, cfg, price, high24, low24, head, add): | |
| now = time.time() | |
| hold = cfg["break_24h_hold_sec"] | |
| thr = cfg["break_24h_pct"] | |
| # 突破高点 | |
| if high24 > 0 and price >= high24 * (1 + thr / 100): | |
| self.break_24h_since.setdefault("high", now) | |
| if now - self.break_24h_since["high"] >= hold: | |
| add("break_24h", "break_24h_high", "涨", "突破24h高点 📈", | |
| f"已突破24h高 {fmt_price(high24)} 超 {thr}% 并维持 {hold // 60} 分钟以上") | |
| else: | |
| self.break_24h_since.pop("high", None) | |
| # 突破低点 | |
| if low24 > 0 and price <= low24 * (1 - thr / 100): | |
| self.break_24h_since.setdefault("low", now) | |
| if now - self.break_24h_since["low"] >= hold: | |
| add("break_24h", "break_24h_low", "跌", "突破24h低点 📉", | |
| f"已跌破24h低 {fmt_price(low24)} 超 {thr}% 并维持 {hold // 60} 分钟以上") | |
| else: | |
| self.break_24h_since.pop("low", None) | |
| def _check_break_7d(self, cfg, sym, price, head, add): | |
| kd = self.api.klines(sym, "1d", 8) | |
| if not kd or len(kd) < 2: | |
| return | |
| # 取已收盘的前 7 日(排除当前未收盘日) | |
| closed = kd[:-1][-7:] | |
| hi7 = max(float(k[2]) for k in closed) | |
| lo7 = min(float(k[3]) for k in closed) | |
| thr = cfg["break_7d_pct"] | |
| if hi7 > 0 and price >= hi7 * (1 + thr / 100): | |
| add("break_7d", "break_7d_high", "涨", "突破7日高点 📈", | |
| f"已突破7日高 {fmt_price(hi7)} 超 {thr}%") | |
| if lo7 > 0 and price <= lo7 * (1 - thr / 100): | |
| add("break_7d", "break_7d_low", "跌", "跌破7日低点 📉", | |
| f"已跌破7日低 {fmt_price(lo7)} 超 {thr}%") | |
| def _check_trades(self, cfg, sym, head, add): | |
| trades = self.api.agg_trades(sym, 1000) | |
| if not trades: | |
| return | |
| cutoff_ms = (time.time() - 300) * 1000 | |
| big_thr = cfg["big_trade_usdt"] | |
| cum = 0.0 | |
| max_single = 0.0 | |
| for t in trades: | |
| try: | |
| notional = float(t["p"]) * float(t["q"]) | |
| ts = t.get("T", 0) | |
| except (KeyError, ValueError): | |
| continue | |
| if ts >= cutoff_ms: | |
| cum += notional | |
| if notional > max_single: | |
| max_single = notional | |
| if max_single >= big_thr: | |
| add("big_trade", "big_trade", "关注", f"单笔大额成交 {fmt_usdt(max_single)}", | |
| f"出现单笔成交 {fmt_usdt(max_single)}(阈值 {fmt_usdt(big_thr)})") | |
| if cum >= cfg["cum_trade_5m_usdt"]: | |
| add("cum_trade", "cum_trade", "关注", f"连续大额成交 {fmt_usdt(cum)}", | |
| f"近5分钟累计成交 {fmt_usdt(cum)}(阈值 {fmt_usdt(cfg['cum_trade_5m_usdt'])})") | |
| def _check_orderbook(self, cfg, sym, price, head, add): | |
| ob = self.api.depth(sym, 1000) | |
| if not ob or not ob.get("bids") or not ob.get("asks"): | |
| return | |
| bids = [(float(p), float(q)) for p, q in ob["bids"]] | |
| asks = [(float(p), float(q)) for p, q in ob["asks"]] | |
| best_bid = bids[0][0] | |
| best_ask = asks[0][0] | |
| mid = (best_bid + best_ask) / 2 | |
| if mid <= 0: | |
| return | |
| # 价差 | |
| spread = (best_ask - best_bid) / mid * 100 | |
| if spread >= cfg["spread_severe_pct"]: | |
| add("spread", "spread", "关注", f"买卖价差异常 {spread:.3f}%", | |
| f"当前价差 {spread:.3f}%(严重阈值 {cfg['spread_severe_pct']}%)") | |
| elif spread >= cfg["spread_pct"]: | |
| add("spread", "spread", "关注", f"买卖价差偏大 {spread:.3f}%", | |
| f"当前价差 {spread:.3f}%(阈值 {cfg['spread_pct']}%)") | |
| # 盘口失衡 | |
| dp = cfg["imbalance_depth_pct"] / 100 | |
| bid_usdt = sum(p * q for p, q in bids if p >= mid * (1 - dp)) | |
| ask_usdt = sum(p * q for p, q in asks if p <= mid * (1 + dp)) | |
| if bid_usdt > 0 and ask_usdt > 0: | |
| if bid_usdt / ask_usdt >= cfg["imbalance_ratio"]: | |
| add("imbalance", "imbalance_buy", "涨", "买盘失衡(买强)", | |
| f"±{cfg['imbalance_depth_pct']}%深度内 买/卖 = {bid_usdt / ask_usdt:.1f} 倍") | |
| elif ask_usdt / bid_usdt >= cfg["imbalance_ratio"]: | |
| add("imbalance", "imbalance_sell", "跌", "卖盘失衡(卖强)", | |
| f"±{cfg['imbalance_depth_pct']}%深度内 卖/买 = {ask_usdt / bid_usdt:.1f} 倍") | |
| # 买卖墙 + 墙消失 | |
| wp = cfg["wall_depth_pct"] / 100 | |
| self._wall_side(cfg, "bid", "买墙", bids, [p for p, q in bids if p >= mid * (1 - wp)], | |
| bids, mid, wp, head, add, "涨") | |
| self._wall_side(cfg, "ask", "卖墙", asks, [p for p, q in asks if p <= mid * (1 + wp)], | |
| asks, mid, wp, head, add, "跌") | |
| def _wall_side(self, cfg, side, label, _all, _prices, orders, mid, wp, head, add, direction): | |
| if side == "bid": | |
| near = [(p, q) for p, q in orders if p >= mid * (1 - wp)] | |
| else: | |
| near = [(p, q) for p, q in orders if p <= mid * (1 + wp)] | |
| if len(near) < 3: | |
| self.prev_walls[side] = 0.0 | |
| return | |
| notionals = [p * q for p, q in near] | |
| max_n = max(notionals) | |
| avg_n = sum(notionals) / len(notionals) | |
| wall_price = near[notionals.index(max_n)][0] | |
| prev = self.prev_walls.get(side, 0.0) | |
| if avg_n > 0 and max_n >= avg_n * cfg["wall_mult"]: | |
| add("wall", f"wall_{side}", "关注", f"出现{label} {fmt_usdt(max_n)}", | |
| f"{fmt_price(wall_price)} 附近挂单 {fmt_usdt(max_n)},为附近均值的 {max_n / avg_n:.1f} 倍") | |
| self.prev_walls[side] = max_n | |
| else: | |
| # 墙消失检测 | |
| if prev > 0 and max_n <= prev * (1 - cfg["wall_vanish_pct"] / 100): | |
| add("wall_vanish", f"wall_vanish_{side}", | |
| "跌" if side == "bid" else "涨", f"{label}快速撤离", | |
| f"原 {label} {fmt_usdt(prev)} 已减少超 {cfg['wall_vanish_pct']}%") | |
| self.prev_walls[side] = max_n | |
| def _check_ma(self, cfg, sym, price, head, add): | |
| kh = self.api.klines(sym, "1h", 21) | |
| if not kh or len(kh) < 20: | |
| return | |
| closes = _floats(kh[-20:], 4) | |
| ma20 = sum(closes) / 20 | |
| if ma20 <= 0: | |
| return | |
| dev = (price - ma20) / ma20 * 100 | |
| if abs(dev) >= cfg["ma_deviation_pct"]: | |
| direction = "涨" if dev > 0 else "跌" | |
| add("ma_deviation", "ma_deviation", direction, f"价格偏离均线 {dev:+.2f}%", | |
| f"偏离1小时MA20({fmt_price(ma20)}) {dev:+.2f}%") | |
| def _check_consecutive(self, cfg, sym, head, add): | |
| n = int(cfg["consecutive_klines"]) | |
| kl = self.api.klines(sym, "15m", n + 1) | |
| if not kl or len(kl) < n: | |
| return | |
| recent = kl[-n:] | |
| ups = all(float(k[4]) > float(k[1]) for k in recent) | |
| downs = all(float(k[4]) < float(k[1]) for k in recent) | |
| if ups: | |
| add("consecutive", "consecutive_up", "涨", "连续上涨 �", | |
| f"连续 {n} 根15分钟阳线") | |
| elif downs: | |
| add("consecutive", "consecutive_down", "跌", "连续下跌 📉", | |
| f"连续 {n} 根15分钟阴线") | |
| def _check_rsi(self, cfg, sym, head, add): | |
| kl = self.api.klines(sym, "15m", 100) | |
| if not kl or len(kl) < 20: | |
| return | |
| closes = _floats(kl, 4) | |
| rsi = compute_rsi(closes, 14) | |
| self.last_rsi = rsi | |
| if rsi is None: | |
| return | |
| if rsi >= cfg["rsi_high"]: | |
| add("rsi", "rsi_high", "关注", f"RSI 超买 {rsi:.0f}", | |
| f"15分钟 RSI = {rsi:.1f}(超买阈值 {cfg['rsi_high']}),警惕回调") | |
| elif rsi <= cfg["rsi_low"]: | |
| add("rsi", "rsi_low", "关注", f"RSI 超卖 {rsi:.0f}", | |
| f"15分钟 RSI = {rsi:.1f}(超卖阈值 {cfg['rsi_low']}),警惕反弹") | |
| def _check_futures(self, cfg, head, add): | |
| fsym = cfg["futures_symbol"] | |
| # 资金费率 | |
| pi = self.api.premium_index(fsym) | |
| if pi and pi.get("lastFundingRate") is not None: | |
| rate = float(pi["lastFundingRate"]) * 100 # 转百分比 | |
| self.last_funding_pct = rate | |
| if rate >= cfg["funding_hot"]: | |
| add("funding", "funding_hot", "关注", f"资金费率过热 {rate:+.4f}%", | |
| f"当前资金费率 {rate:+.4f}%/期(过热阈值 {cfg['funding_hot']}%),多头拥挤") | |
| elif rate >= cfg["funding_high"]: | |
| add("funding", "funding_high", "关注", f"资金费率偏高 {rate:+.4f}%", | |
| f"当前资金费率 {rate:+.4f}%/期(偏高阈值 {cfg['funding_high']}%)") | |
| elif rate <= cfg["funding_neg"]: | |
| add("funding", "funding_neg", "关注", f"资金费率偏负 {rate:+.4f}%", | |
| f"当前资金费率 {rate:+.4f}%/期(偏负阈值 {cfg['funding_neg']}%),空头拥挤") | |
| # 持仓量变化 | |
| oi_data = self.api.open_interest(fsym) | |
| if oi_data and oi_data.get("openInterest") is not None: | |
| oi = float(oi_data["openInterest"]) | |
| now = time.time() | |
| self.oi_history.append((now, oi)) | |
| for win_min, thr_key, cn in ((15, "oi_15m", "15分钟"), (60, "oi_1h", "1小时"), (240, "oi_4h", "4小时")): | |
| past = self._oi_at(now - win_min * 60) | |
| if past and past > 0: | |
| chg = (oi - past) / past * 100 | |
| if abs(chg) >= cfg[thr_key]: | |
| direction = "增" if chg > 0 else "减" | |
| add("oi", f"oi_{win_min}", "关注", f"持仓量{cn}{direction} {chg:+.1f}%", | |
| f"持仓量{cn}内{direction} {chg:+.1f}%(阈值 {cfg[thr_key]}%)") | |
| # 强平 | |
| liq5 = self.liq.get_5m_total(fsym) if self.liq else None | |
| if liq5 is not None: | |
| if liq5 >= cfg["liq_severe_usdt"]: | |
| add("liquidation", "liq_severe", "关注", f"严重强平 {fmt_usdt(liq5)}", | |
| f"近5分钟强平 {fmt_usdt(liq5)}(严重阈值 {fmt_usdt(cfg['liq_severe_usdt'])})") | |
| elif liq5 >= cfg["liq_5m_usdt"]: | |
| add("liquidation", "liq_5m", "关注", f"强平金额异常 {fmt_usdt(liq5)}", | |
| f"近5分钟强平 {fmt_usdt(liq5)}(阈值 {fmt_usdt(cfg['liq_5m_usdt'])})") | |
| def _oi_at(self, target_ts): | |
| """返回最接近 target_ts 的历史持仓量(要求有足够久的样本)""" | |
| best = None | |
| best_diff = None | |
| for ts, oi in self.oi_history: | |
| diff = abs(ts - target_ts) | |
| if best_diff is None or diff < best_diff: | |
| best_diff = diff | |
| best = oi | |
| # 样本与目标时间差太大(超过窗口一半)则视为无效 | |
| if best_diff is not None and best_diff <= 600: | |
| return best | |
| return None | |
| # --- 10. 监控主逻辑 --- | |
| class CryptoMonitor: | |
| def __init__(self): | |
| self.logs = deque(maxlen=80) | |
| self.status_text = "初始化中..." | |
| self.next_wakeup = None | |
| self.lock = threading.Lock() | |
| self.config = load_config() | |
| self.snapshots = {} # coin -> 最新行情快照 | |
| self.stats = {"alerts": 0, "notify_fails": 0, "api_fails": 0, "loops": 0} | |
| self.last_daily_report_date = None | |
| # 配置文件不存在时,首次启动自动校准默认阈值 | |
| self._auto_calibrate_needed = not os.path.exists(CONFIG_FILE) | |
| self.api = BinanceAPI(self.log) | |
| self.pusher = WeWorkBotPusher(WEWORK_BOT_WEBHOOK) | |
| self.alert_mgr = AlertManager(self.pusher, self.log, self.stats) | |
| self.liq = LiquidationTracker(self._futures_symbols(), self.log) | |
| self.evaluators = {} | |
| self._rebuild_evaluators() | |
| self.thread = threading.Thread(target=self._run_loop, daemon=True) | |
| self.thread.start() | |
| def log(self, msg): | |
| ts = get_beijing_now().strftime("%H:%M:%S") | |
| entry = f"[{ts}] {msg}" | |
| print(entry) | |
| self.logs.appendleft(entry) | |
| def _futures_symbols(self): | |
| return [c["futures_symbol"] for c in self.config["coins"].values() | |
| if c.get("enable_futures")] | |
| def _rebuild_evaluators(self): | |
| with self.lock: | |
| for name in list(self.evaluators.keys()): | |
| if name not in self.config["coins"]: | |
| self.evaluators.pop(name, None) | |
| for name in self.config["coins"]: | |
| if name not in self.evaluators: | |
| self.evaluators[name] = CoinEvaluator(name, self.api, self.liq, self.log) | |
| def update_config(self, new_cfg): | |
| """由前端调用:保存并热更新配置""" | |
| with self.lock: | |
| self.config = new_cfg | |
| save_config(new_cfg) | |
| self.liq.update_symbols(self._futures_symbols()) | |
| self._rebuild_evaluators() | |
| self.log("⚙️ 配置已更新并保存") | |
| def calibrate_all(self): | |
| """对所有币种执行历史校准并保存(后台首次启动调用)""" | |
| with self.lock: | |
| cfg = copy.deepcopy(self.config) | |
| scale = float(cfg["global"].get("alert_sensitivity", 1.0)) | |
| for name, coin in cfg["coins"].items(): | |
| try: | |
| overrides = calibrate_thresholds(self.api, coin, self.log, scale) | |
| cfg["coins"][name] = _deep_merge(coin, overrides) | |
| except Exception as e: | |
| self.log(f"⚠️ {name} 校准失败: {e}") | |
| with self.lock: | |
| self.config = cfg | |
| save_config(cfg) | |
| self._rebuild_evaluators() | |
| def _maybe_daily_report(self): | |
| now = get_beijing_now() | |
| today = now.strftime("%Y-%m-%d") | |
| hour = int(self.config["global"].get("daily_report_hour", 9)) | |
| if now.time() >= dt_time(hour, 0) and self.last_daily_report_date != today: | |
| if self.last_daily_report_date is None: | |
| # 首次启动当天不发,避免重启刷屏 | |
| self.last_daily_report_date = today | |
| return | |
| coins = "、".join(self.config["coins"].keys()) | |
| msg = ( | |
| f"🚀 虚拟货币价格监控日报\n\n" | |
| f"{now.strftime('%Y年%m月%d日')} 服务正常运行中。\n" | |
| f"监控币种:{coins}\n\n" | |
| f"昨日统计:\n" | |
| f"推送告警:{self.stats['alerts']} 次\n" | |
| f"发送失败:{self.stats['notify_fails']} 次\n" | |
| f"接口失败:{self.stats['api_fails']} 次\n" | |
| f"轮询次数:{self.stats['loops']} 次" | |
| ) | |
| self.pusher.send_text(msg) | |
| self.log(f"🔔 已发送每日报告: {today}") | |
| self.last_daily_report_date = today | |
| self.stats.update({"alerts": 0, "notify_fails": 0, "api_fails": 0, "loops": 0}) | |
| def _run_loop(self): | |
| self.log("🚀 虚拟货币价格监控服务已启动") | |
| if not WEWORK_BOT_WEBHOOK: | |
| self.log("⚠️ 未配置 WEWORK_BOT_WEBHOOK,仅记录日志不推送") | |
| if ws_client is None: | |
| self.log("⚠️ 未安装 websocket-client,强平监控已禁用") | |
| if self._auto_calibrate_needed: | |
| self.log("🧪 首次启动:根据历史行情自动校准各币种阈值...") | |
| try: | |
| self.calibrate_all() | |
| self.log("✅ 首次自动校准完成并已保存") | |
| except Exception as e: | |
| self.log(f"⚠️ 自动校准异常: {e}") | |
| self._auto_calibrate_needed = False | |
| while True: | |
| try: | |
| with self.lock: | |
| cfg = copy.deepcopy(self.config) | |
| interval = int(cfg["global"].get("poll_interval_sec", 30)) | |
| self._maybe_daily_report() | |
| self.status_text = "🔥 监控中" | |
| total_alerts = [] | |
| for name, coin_cfg in cfg["coins"].items(): | |
| evaluator = self.evaluators.get(name) | |
| if not evaluator: | |
| continue | |
| try: | |
| alerts, snap = evaluator.evaluate(coin_cfg) | |
| if snap: | |
| self.snapshots[name] = snap | |
| else: | |
| self.stats["api_fails"] += 1 | |
| total_alerts.extend(alerts) | |
| except Exception as e: | |
| self.log(f"❌ {name} 评估异常: {e}") | |
| if total_alerts: | |
| self.alert_mgr.emit(total_alerts) | |
| self.stats["loops"] += 1 | |
| now = get_beijing_now() | |
| self.next_wakeup = now + timedelta(seconds=interval) | |
| time.sleep(interval) | |
| except Exception as e: | |
| self.log(f"❌ 主循环异常: {e}") | |
| time.sleep(30) | |
| # --- 11. Streamlit 前端 --- | |
| def get_monitor(): | |
| return CryptoMonitor() | |
| # 阈值字段的中文标签与分组(用于动态生成编辑器) | |
| FIELD_GROUPS = [ | |
| ("关键价位", [ | |
| ("cost_price", "成本价 (0=不监控)"), | |
| ("target_price", "目标价 (0=不监控)"), | |
| ("stop_price", "止损价 (0=不监控)"), | |
| ("near_cost_pct", "接近成本价阈值 %"), | |
| ("near_key_pct", "接近目标/止损阈值 %"), | |
| ]), | |
| ("突破", [ | |
| ("break_24h_pct", "突破24h高低 %"), | |
| ("break_24h_hold_sec", "突破维持秒数"), | |
| ("break_7d_pct", "突破7日高低 %"), | |
| ]), | |
| ("成交量与振幅", [ | |
| ("vol_anomaly_mult", "5m放量倍数"), | |
| ("vol_severe_mult", "5m严重放量倍数"), | |
| ("amp_5m", "5m振幅 %"), | |
| ("amp_15m", "15m振幅 %"), | |
| ("amp_1h", "1h振幅 %"), | |
| ]), | |
| ("大额成交", [ | |
| ("big_trade_usdt", "单笔大额 USDT"), | |
| ("cum_trade_5m_usdt", "5m累计大额 USDT"), | |
| ]), | |
| ("盘口", [ | |
| ("spread_pct", "价差 %"), | |
| ("spread_severe_pct", "严重价差 %"), | |
| ("imbalance_depth_pct", "失衡统计深度 %"), | |
| ("imbalance_ratio", "买卖失衡倍数"), | |
| ("wall_depth_pct", "买卖墙深度 %"), | |
| ("wall_mult", "墙厚度倍数"), | |
| ("wall_vanish_pct", "墙消失减少 %"), | |
| ]), | |
| ("技术指标", [ | |
| ("ma_deviation_pct", "偏离1h MA20 %"), | |
| ("consecutive_klines", "连续15m K线根数"), | |
| ("rsi_high", "RSI 超买"), | |
| ("rsi_low", "RSI 超卖"), | |
| ]), | |
| ("合约 (需启用)", [ | |
| ("funding_high", "资金费率偏高 %"), | |
| ("funding_hot", "资金费率过热 %"), | |
| ("funding_neg", "资金费率偏负 %"), | |
| ("oi_15m", "持仓15m变化 %"), | |
| ("oi_1h", "持仓1h变化 %"), | |
| ("oi_4h", "持仓4h变化 %"), | |
| ("liq_5m_usdt", "5m强平 USDT"), | |
| ("liq_severe_usdt", "严重强平 USDT"), | |
| ]), | |
| ] | |
| INT_FIELDS = {"break_24h_hold_sec", "consecutive_klines"} | |
| def render_config_editor(monitor): | |
| cfg = copy.deepcopy(monitor.config) | |
| st.subheader("⚙️ 监控参数设置") | |
| with st.expander("全局设置", expanded=False): | |
| g = cfg["global"] | |
| g["poll_interval_sec"] = st.number_input( | |
| "轮询间隔(秒)", min_value=5, max_value=600, | |
| value=int(g.get("poll_interval_sec", 30)), step=5) | |
| g["daily_report_hour"] = st.number_input( | |
| "每日报告时间(小时)", min_value=0, max_value=23, | |
| value=int(g.get("daily_report_hour", 9))) | |
| g["alert_sensitivity"] = st.number_input( | |
| "通知灵敏度(越大通知越多,建议 0.5~2.0)", min_value=0.2, max_value=5.0, | |
| value=float(g.get("alert_sensitivity", 1.0)), step=0.1, format="%.1f", | |
| help="校准时按“理想每日触发次数 × 灵敏度”反推阈值;调整后点币种页的“历史自动校准”生效") | |
| coin_names = list(cfg["coins"].keys()) | |
| tabs = st.tabs(coin_names + ["➕ 新增币种"]) | |
| for i, name in enumerate(coin_names): | |
| with tabs[i]: | |
| coin = cfg["coins"][name] | |
| cc1, cc2, cc3 = st.columns(3) | |
| with cc1: | |
| coin["symbol"] = st.text_input( | |
| "现货交易对", value=coin["symbol"], key=f"{name}_symbol") | |
| with cc2: | |
| coin["futures_symbol"] = st.text_input( | |
| "合约交易对", value=coin["futures_symbol"], key=f"{name}_fsym") | |
| with cc3: | |
| coin["enable_futures"] = st.checkbox( | |
| "启用合约监控", value=coin.get("enable_futures", False), key=f"{name}_futenable") | |
| cal_col, _ = st.columns([1, 3]) | |
| if cal_col.button("📈 历史自动校准", key=f"cal_{name}", | |
| help="拉取该币种历史行情,自动推算更贴合的阈值"): | |
| with st.spinner(f"正在根据历史行情校准 {name} ..."): | |
| scale = float(cfg["global"].get("alert_sensitivity", 1.0)) | |
| overrides = calibrate_thresholds(monitor.api, coin, monitor.log, scale) | |
| cfg["coins"][name] = _deep_merge(coin, overrides) | |
| monitor.update_config(cfg) | |
| st.success(f"{name} 已根据历史校准并保存") | |
| st.rerun() | |
| st.markdown("**快速涨跌触发阈值 %(绝对值,方向🟢涨/🔴跌自动判断)**") | |
| pc = coin["price_change"] | |
| pcols = st.columns(4) | |
| for idx, win in enumerate(["1m", "5m", "15m", "1h"]): | |
| val = pc.get(win, 1.0) | |
| if isinstance(val, list): | |
| val = val[0] if val else 1.0 | |
| pc[win] = pcols[idx].number_input( | |
| win, value=float(val), step=0.05, | |
| key=f"{name}_{win}_pc", format="%.2f") | |
| for group_name, fields in FIELD_GROUPS: | |
| with st.expander(group_name, expanded=False): | |
| cols = st.columns(2) | |
| for j, (fkey, flabel) in enumerate(fields): | |
| with cols[j % 2]: | |
| cur = coin.get(fkey, 0) | |
| if fkey in INT_FIELDS: | |
| coin[fkey] = st.number_input( | |
| flabel, value=int(cur), step=1, key=f"{name}_{fkey}") | |
| else: | |
| coin[fkey] = st.number_input( | |
| flabel, value=float(cur), | |
| step=1000.0 if "usdt" in fkey else 0.1, | |
| key=f"{name}_{fkey}", format="%.4f") | |
| if st.button(f"🗑️ 删除 {name}", key=f"del_{name}"): | |
| if len(coin_names) > 1: | |
| cfg["coins"].pop(name) | |
| monitor.update_config(cfg) | |
| st.rerun() | |
| else: | |
| st.warning("至少保留一个币种") | |
| # 新增币种 | |
| with tabs[-1]: | |
| st.markdown("基于 BTC 默认阈值创建新币种,创建后可在对应标签页微调。") | |
| nc1, nc2 = st.columns(2) | |
| new_name = nc1.text_input("币种代号(如 ETH)", key="new_coin_name") | |
| new_symbol = nc2.text_input("现货交易对(如 ETHUSDT)", key="new_coin_symbol") | |
| if st.button("➕ 创建币种"): | |
| new_name = (new_name or "").strip().upper() | |
| new_symbol = (new_symbol or "").strip().upper() | |
| if not new_name or not new_symbol: | |
| st.warning("请填写币种代号和交易对") | |
| elif new_name in cfg["coins"]: | |
| st.warning("该币种已存在") | |
| else: | |
| template = _btc_defaults() | |
| template["symbol"] = new_symbol | |
| template["futures_symbol"] = new_symbol | |
| template["enable_futures"] = False | |
| with st.spinner(f"正在根据历史行情自动校准 {new_name} 的默认值..."): | |
| scale = float(cfg["global"].get("alert_sensitivity", 1.0)) | |
| overrides = calibrate_thresholds(monitor.api, template, monitor.log, scale) | |
| template = _deep_merge(template, overrides) | |
| cfg["coins"][new_name] = template | |
| monitor.update_config(cfg) | |
| st.success(f"已创建并校准 {new_name}") | |
| st.rerun() | |
| st.divider() | |
| if st.button("💾 保存全部设置", type="primary"): | |
| monitor.update_config(cfg) | |
| st.success("设置已保存并热更新") | |
| def main(): | |
| monitor = get_monitor() | |
| if st_autorefresh: | |
| st_autorefresh(interval=30 * 1000, key="crypto_refresh") | |
| st.title("🚀 虚拟货币价格通知") | |
| c1, c2, c3, c4 = st.columns(4) | |
| c1.metric("运行状态", monitor.status_text) | |
| c2.metric("下次唤醒", | |
| monitor.next_wakeup.strftime("%H:%M:%S") if monitor.next_wakeup else "--") | |
| c3.metric("今日累计告警", monitor.stats["alerts"]) | |
| c4.metric("监控币种", len(monitor.config["coins"])) | |
| st.divider() | |
| # 实时行情快照 | |
| st.subheader("📊 实时行情") | |
| snap_cols = st.columns(max(1, len(monitor.snapshots))) | |
| if monitor.snapshots: | |
| for idx, (name, snap) in enumerate(monitor.snapshots.items()): | |
| with snap_cols[idx]: | |
| rsi = snap.get("rsi") | |
| funding = snap.get("funding") | |
| extra = [] | |
| if rsi is not None: | |
| extra.append(f"RSI {rsi:.0f}") | |
| if funding is not None: | |
| extra.append(f"费率 {funding:+.3f}%") | |
| st.metric(name, f"{fmt_price(snap['price'])}", | |
| " | ".join(extra) if extra else None) | |
| else: | |
| st.caption("等待首次行情拉取...") | |
| st.divider() | |
| col_log, col_cfg = st.columns([2, 3]) | |
| with col_log: | |
| st.subheader("📜 运行日志") | |
| st.text_area("Logs", "\n".join(list(monitor.logs)), height=600, disabled=True) | |
| with col_cfg: | |
| render_config_editor(monitor) | |
| if __name__ == "__main__": | |
| main() | |