BK-GTA-Engine / app.py
max1949's picture
Update app.py
4d92031 verified
import discord
from discord.ext import commands, tasks
from google import genai
from google.genai import types
import os
import math
import random
import re
import datetime
import json
import asyncio
import functools
import logging
import sys
import traceback
import io
import base64
import time
import threading
import socket
from PIL import Image, ImageDraw, ImageFont
import gradio as gr
# ==============================================================================
# 0. 全局异常审计与日志系统 (Institutional Audit Logging)
# ==============================================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("BK_GTA_ARCHITECT")
# ==============================================================================
# 0.5 [HuggingFace 专用] DNS 稳定性修复 (增强版)
# ==============================================================================
def fix_dns():
"""
HuggingFace Spaces 容器 DNS 解析器无法解析 discord.com。
方案: 用 dnspython 通过 Google DNS (8.8.8.8) 直接查询 IP,
然后写入 /etc/hosts 完全绕过容器的 DNS。
"""
try:
with open('/etc/resolv.conf', 'r') as f:
current = f.read()
if '8.8.8.8' not in current:
with open('/etc/resolv.conf', 'a') as f:
f.write('\nnameserver 8.8.8.8\nnameserver 1.1.1.1\n')
logger.info("✅ [DNS] Injected backup DNS to resolv.conf.")
except Exception as e:
logger.warning(f"⚠️ [DNS] resolv.conf write failed: {e}")
hosts_to_resolve = [
'discord.com', 'gateway.discord.gg',
'cdn.discordapp.com', 'api.discord.com',
]
try:
import dns.resolver
resolver = dns.resolver.Resolver(configure=False)
resolver.nameservers = ['8.8.8.8', '1.1.1.1', '8.8.4.4']
resolver.timeout = 10
resolver.lifetime = 10
hosts_lines = []
for hostname in hosts_to_resolve:
try:
answers = resolver.resolve(hostname, 'A')
ip = answers[0].to_text()
hosts_lines.append(f"{ip} {hostname}")
logger.info(f"✅ [DNS] Resolved {hostname}{ip}")
except Exception as e:
logger.warning(f"⚠️ [DNS] Failed to resolve {hostname}: {e}")
if hosts_lines:
try:
with open('/etc/hosts', 'a') as f:
f.write('\n# BK-GTA Discord DNS Fix\n')
for line in hosts_lines:
f.write(line + '\n')
logger.info(f"✅ [DNS] Wrote {len(hosts_lines)} entries to /etc/hosts.")
except PermissionError:
logger.warning("⚠️ [DNS] /etc/hosts is read-only.")
except Exception as e:
logger.warning(f"⚠️ [DNS] hosts write error: {e}")
else:
logger.error("❌ [DNS] Could not resolve any Discord hosts.")
except ImportError:
logger.warning("⚠️ [DNS] dnspython not installed, skipping hosts injection.")
except Exception as e:
logger.error(f"❌ [DNS] Resolution failed: {e}")
fix_dns()
# ==============================================================================
# 1. 机构级架构配置 | Institutional Configuration
# ==============================================================================
DISCORD_TOKEN = os.environ.get('DISCORD_TOKEN', '').strip()
SESSION_ID = random.randint(10000, 99999)
# [CORE] API 密钥池加载逻辑 (多路复用驱动)
raw_keys_pool = os.environ.get('GEMINI_KEYS_POOL', '')
KEY_POOL = []
if raw_keys_pool:
parts = raw_keys_pool.split(',')
for k in parts:
clean_key = k.strip()
if clean_key:
KEY_POOL.append(clean_key)
logger.info(f"✅ [System] Loaded GEMINI_KEYS_POOL with {len(KEY_POOL)} keys.")
else:
single_key = os.environ.get('GEMINI_KEY')
if single_key:
KEY_POOL.append(single_key)
logger.warning("⚠️ [System] Running in SINGLE KEY mode.")
else:
logger.critical("❌ [CRITICAL] NO API KEYS FOUND.")
# 设置 Discord Intent 权限
intents = discord.Intents.default()
intents.message_content = True
intents.members = True
# 默认锁定稳定模型
ACTIVE_MODEL_ID = "gemini-2.0-flash-lite"
# [HuggingFace] 持久化路径
DB_PATH = '/data/bk_gta_v62_unified.json' if os.path.exists('/data') else 'bk_gta_v62_unified.json'
# ==============================================================================
# 2. 状态引擎与数据库 (V62.9 Unified Analytics)
# ==============================================================================
class BKEngine:
"""核心数据引擎:负责量化资产持久化、状态树管理及风险指标对账"""
def __init__(self):
self.db_file = DB_PATH
self.data = self._load_db()
self.pending_challenges = {}
self.active_duels = {}
self.active_bets = {}
self.system_uptime = datetime.datetime.now()
def _load_db(self):
if os.path.exists(self.db_file):
try:
with open(self.db_file, 'r', encoding='utf-8') as f:
content = json.load(f)
if "users" not in content:
content["users"] = {}
if "factions" not in content:
content["factions"] = {
"BULLS": {"score": 0, "emoji": "🐂", "total_wins": 0},
"BEARS": {"score": 0, "emoji": "🐻", "total_wins": 0}
}
if "prize_pool" not in content:
content["prize_pool"] = 1000000.0
return content
except Exception as e:
logger.error(f"DB Load Failure: {e}")
return self._init_schema()
return self._init_schema()
def _init_schema(self):
return {
"version": "62.9",
"users": {},
"factions": {
"BULLS": {"score": 0, "emoji": "🐂", "total_wins": 0},
"BEARS": {"score": 0, "emoji": "🐻", "total_wins": 0}
},
"prize_pool": 1000000.0,
"last_global_sync": str(datetime.datetime.now())
}
def save_db(self):
"""执行物理持久化,确保数据资产安全"""
try:
temp_file = self.db_file + '.tmp'
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(self.data, f, ensure_ascii=False, indent=4)
os.replace(temp_file, self.db_file)
except Exception as e:
logger.error(f"💾 [DB Error] Failed to persist data: {e}")
def get_user(self, user_id):
uid = str(user_id)
if uid not in self.data["users"]:
self.data["users"][uid] = {
"elo": 1200, "points": 1000, "faction": None,
"audits_count": 0, "total_score": 0.0, "highest_score": 0.0,
"win_streak": 0, "mdd_record": 0.0, "sharpe_record": 0.0,
"performance_log": [],
"last_update": str(datetime.datetime.now())
}
return self.data["users"][uid]
def update_audit_stats(self, user_id, score, mdd=None, sharpe=None):
u = self.get_user(user_id)
u["audits_count"] += 1
u["total_score"] += score
if score > u.get("highest_score", 0):
u["highest_score"] = score
if mdd is not None: u["mdd_record"] = mdd
if sharpe is not None: u["sharpe_record"] = sharpe
u["points"] += 50
u["last_update"] = str(datetime.datetime.now())
u["performance_log"].append({"score": score, "time": str(datetime.datetime.now())})
self.save_db()
def update_elo(self, winner_id, loser_id):
"""基于 ELO 等级分系统的量化能力更新算法 (K=32)"""
w = self.get_user(winner_id)
l = self.get_user(loser_id)
K = 32
Rw, Rl = w["elo"], l["elo"]
Ew = 1 / (1 + 10 ** ((Rl - Rw) / 400))
w["elo"] = round(Rw + K * (1 - Ew))
l["elo"] = round(Rl + K * (0 - Ew))
w["win_streak"] += 1
l["win_streak"] = 0
if w["faction"] and w["faction"] in self.data["factions"]:
self.data["factions"][w["faction"]]["score"] += 15
self.data["factions"][w["faction"]]["total_wins"] += 1
self.save_db()
return w["elo"], l["elo"], w["win_streak"]
bk_engine = BKEngine()
# ==============================================================================
# 3. 核心辅助内核 (Helper Kernels & Security)
# ==============================================================================
def safe_extract_float(text, key):
"""鲁棒性数值提取引擎,支持多重前缀匹配"""
try:
patterns = [
rf"{key}:\s*([-+]?\d*\.?\d+)",
rf"{key}\s*=\s*([-+]?\d*\.?\d+)",
rf"【{key}】\s*([-+]?\d*\.?\d+)"
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match: return float(match.group(1))
return 0.0
except Exception: return 0.0
def safe_extract_text(text, key_start, key_end=None):
"""逻辑块提取引擎,支持长文本切片"""
try:
if key_end:
pattern = rf"{key_start}:(.*?){key_end}:"
else:
pattern = rf"{key_start}:(.*)"
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
content = match.group(1).strip()
return content.replace('*', '').replace('`', '').strip()
return "N/A"
except Exception: return "N/A"
async def call_gemini_multiplex(prompt, attachment=None):
"""
直接 REST API 调用 Gemini,彻底绕过 SDK 的 AFC 额外请求开销。
同时支持 v1beta 和 v1 双端点自动降级。
"""
if not KEY_POOL:
logger.error("Request Blocked: No API Keys available.")
return "ERROR: NO_KEYS"
image_data = None
if attachment:
try:
image_data = await attachment.read()
except Exception as e:
logger.error(f"Attachment Read Error: {e}")
return "ERROR: IMAGE_READ_FAILURE"
import aiohttp
parts = [{"text": prompt}]
if image_data:
b64 = base64.b64encode(image_data).decode("utf-8")
parts.append({"inlineData": {"mimeType": "image/png", "data": b64}})
payload = {"contents": [{"parts": parts}]}
api_versions = ["v1beta", "v1"]
for api_ver in api_versions:
for idx, api_key in enumerate(KEY_POOL):
try:
url = (
f"https://generativelanguage.googleapis.com/{api_ver}/models/"
f"{ACTIVE_MODEL_ID}:generateContent?key={api_key}"
)
async with aiohttp.ClientSession() as session:
async with session.post(
url, json=payload,
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
status = resp.status
if status == 200:
data = await resp.json()
text = data["candidates"][0]["content"]["parts"][0]["text"]
logger.info(f"✅ Gemini OK via {api_ver} key #{idx+1}")
return text
elif status == 429:
logger.warning(f"Key #{idx+1} ({api_ver}) got 429, switching...")
await asyncio.sleep(3)
continue
elif status == 404:
logger.warning(f"Model not found on {api_ver}, trying next version...")
break
else:
body = await resp.text()
logger.error(f"Key #{idx+1} ({api_ver}) HTTP {status}: {body[:300]}")
continue
except asyncio.TimeoutError:
logger.warning(f"Key #{idx+1} timeout, switching...")
continue
except Exception as e:
logger.error(f"Key #{idx+1} error: {e}")
continue
return "ERROR: ALL_KEYS_EXHAUSTED"
# ==============================================================================
# 4. Gradio 保活哨兵 (替代原 Flask keep_alive)
# ==============================================================================
bot = None
def get_status():
"""提供 HTTP 心跳接口,返回逻辑状态"""
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
latency = "N/A"
bot_status = "OFFLINE"
try:
if bot and bot.is_ready():
bot_status = "ONLINE"
latency = f"{round(bot.latency * 1000, 2)}ms"
except Exception as e:
logger.warning(f"Heartbeat probe failed: {e}")
uptime = str(datetime.datetime.now() - bk_engine.system_uptime).split('.')[0]
return {
"status": "BK-GTA-V62.9-FINAL-ACTIVE",
"engine": "BK-GTA-Logic-Kernel",
"discord_bot": bot_status,
"bot_latency": latency,
"server_time": now,
"uptime": uptime,
"sentinel": "PROTECTED",
"integrity_check": "PASSED",
"total_commands": 20,
"keys_pool": len(KEY_POOL),
"factions": len(bk_engine.data.get("factions", {})),
"registered_users": len(bk_engine.data.get("users", {})),
"memory_pool": f"{sys.getsizeof(bk_engine.data) / 1024:.2f} KB"
}
def keep_alive():
"""开启独立守护线程运行 Gradio 服务 (HuggingFace 网关入口 7860)"""
def run_gradio():
try:
gradio_logger = logging.getLogger('gradio')
gradio_logger.setLevel(logging.WARNING)
iface = gr.Interface(
fn=get_status, inputs=None, outputs="json",
title="BK-GTA Sentinel AI",
description="Physical architecture monitor. Logic Integrity: Secure."
)
iface.launch(server_name="0.0.0.0", server_port=7860)
except Exception as e:
logger.error(f"⚠️ [Sentinel] Gradio Server Failure: {e}")
t = threading.Thread(target=run_gradio)
t.daemon = True
t.start()
logger.info("📡 [Sentinel] Keep-alive portal established on port 7860 (Gradio).")
# ==============================================================================
# 5. Bot 工厂 + 全模块命令注册 (Bot Factory & Full Command Registration)
# 总计 19 自定义命令 + 1 内置 help = 20 命令
# ==============================================================================
def create_bot_with_commands():
"""
创建完整的 Bot 实例并注册所有命令/事件。
支持在网络重试时重建 Bot 实例而不丢失任何功能。
"""
global bot
new_bot = commands.Bot(command_prefix='!', intents=intents)
bot = new_bot
# ==================================================================
# 模块一:全网天梯与个人档案 (Rank & Profile System)
# ==================================================================
@new_bot.command()
async def profile(ctx, member: discord.Member = None):
"""展现量化交易员的详细画像与 ELO 评级"""
target = member or ctx.author
u = bk_engine.get_user(target.id)
elo = u['elo']
if elo < 1300: rank, color = "Iron (学徒) 🗑️", 0x717d7e
elif elo < 1500: rank, color = "Bronze (资深) 🥉", 0xcd7f32
elif elo < 1800: rank, color = "Silver (精英) 🥈", 0xbdc3c7
elif elo < 2100: rank, color = "Gold (专家) 🥇", 0xf1c40f
elif elo < 2500: rank, color = "Platinum (大师) 💎", 0x3498db
else: rank, color = "Grandmaster (传奇) 👑", 0x9b59b6
emb = discord.Embed(title=f"📊 Quantitative Profile: {target.display_name}", color=color)
emb.add_field(name="Rating (ELO)", value=f"**{elo}**\nRank: `{rank}`", inline=True)
emb.add_field(name="Assets (Points)", value=f"💰 **{u['points']}**", inline=True)
f_name = u['faction']
f_emoji = bk_engine.data["factions"].get(f_name, {}).get("emoji", "🌐") if f_name else "Ronin"
emb.add_field(name="Faction", value=f"{f_emoji} {f_name or 'Unassigned'}", inline=True)
perf_stats = (
f"Total Audits: {u['audits_count']}\n"
f"Peak Audit Score: {u['highest_score']:.2f}\n"
f"Historical MDD: {u.get('mdd_record', 0):.2f}%\n"
f"Max Win Streak: {u['win_streak']}"
)
emb.add_field(name="Risk-Adjusted Performance", value=f"```yaml\n{perf_stats}\n```", inline=False)
if target.avatar:
emb.set_thumbnail(url=target.avatar.url)
emb.set_footer(text=f"System Session: {SESSION_ID} | Logic V62.9")
await ctx.send(embed=emb)
@new_bot.command()
async def leaderboard(ctx):
"""全量天梯对账,展示排名前 10 的顶级交易员"""
all_users = []
for uid, data in bk_engine.data["users"].items():
all_users.append({"id": uid, "elo": data["elo"], "score": data["highest_score"]})
top_elo = sorted(all_users, key=lambda x: x["elo"], reverse=True)[:10]
desc = ""
for idx, u in enumerate(top_elo):
medal = "🥇" if idx == 0 else "🥈" if idx == 1 else "🥉" if idx == 2 else f"{idx+1}."
desc += f"{medal} <@{u['id']}> : ELO **{u['elo']}** | Peak Score: **{u['score']:.1f}**\n"
emb = discord.Embed(
title="🏆 BK-GTA Institutional Leaderboard",
description=desc or "Scanning market data... (Waiting for submissions)",
color=0xf1c40f
)
f_info = ""
for name, info in bk_engine.data["factions"].items():
f_info += f"{info['emoji']} {name}: {info['score']} pts "
if f_info:
emb.add_field(name="⚔️ Faction War Status", value=f_info, inline=False)
await ctx.send(embed=emb)
# ==================================================================
# 模块二:宏观情报分析 (Macro Intelligence Engine)
# ==================================================================
@new_bot.command()
async def news(ctx):
"""分析新闻/报表截图,并基于机构逻辑给出宏观评分"""
if not ctx.message.attachments:
return await ctx.send("❌ Audit Refused: 请上传新闻图表。")
m = await ctx.send("📡 **[System] Parsing Macro Pulse...**")
prompt = (
"你是一名对冲基金首席策略师。请解析此截图并输出:\n"
"1. 情绪评分 (SCORE): -10 (极度利空) 到 +10 (极度利多)。\n"
"2. 核心变量 (SUMMARY): 30字内概括。\n"
"3. 隐含逻辑 (TAKE): 机构视角的解读 (100字内)。\n"
"输出格式必须严格如下:\n"
"SCORE: [分值]\nSUMMARY: [内容]\nTAKE: [解读]"
)
res = await call_gemini_multiplex(prompt, ctx.message.attachments[0])
if "ERROR" in res:
await m.delete()
return await ctx.send("🔴 [Audit Failure] Intelligence node offline.")
score = safe_extract_float(res, "SCORE")
summary = safe_extract_text(res, "SUMMARY", "TAKE")
take = safe_extract_text(res, "TAKE")
color = 0x2ecc71 if score > 0 else 0xe74c3c if score < 0 else 0xbdc3c7
emb = discord.Embed(title="📰 Macro Sentiment Pulse", color=color)
emb.add_field(name="Sentiment Score", value=f"**{score:+.1f} / 10**", inline=True)
emb.add_field(name="Core Catalyst", value=summary, inline=False)
emb.add_field(name="🏦 Institutional Analysis", value=f"```\n{take}\n```", inline=False)
await m.delete()
await ctx.send(embed=emb)
# ==================================================================
# 模块三:机构级量化审计 (Quantitative Audit Engine)
# ==================================================================
@new_bot.command()
async def evaluate(ctx):
"""基于截图全维度识别回测/实盘参数,并执行风控打分"""
if not ctx.message.attachments:
return await ctx.send("❌ Audit Refused: 请上传结算单或净值曲线。")
sid = random.randint(1000, 9999)
m = await ctx.send(f"🛡️ **[SID:{sid}] Audit Engine Initializing...**")
prompt = (
"你是一名顶级量化风险官。请执行全维度审计。\n"
"【提取要求】:\n"
"1. 核心指标:RET(收益率), MDD(最大回撤), PF(盈利因子), SHARPE(夏普), TRADES(笔数)。\n"
"2. 亮点 (H_ZH): 中文优势概括。\n"
"3. 评分 (S_BASE): 基础评分 (0-100)。\n"
"输出格式:\n"
"RET: [值]\nMDD: [值]\nPF: [值]\nSHARPE: [值]\nTRADES: [笔数]\nS_BASE: [分]\nH_ZH: [优势]"
)
res = await call_gemini_multiplex(prompt, ctx.message.attachments[0])
if "ERROR" in res:
await m.delete()
return await ctx.send("🔴 [Audit Crash] Engine disconnected.")
ret = safe_extract_float(res, "RET")
mdd = safe_extract_float(res, "MDD")
pf = safe_extract_float(res, "PF")
sharpe = safe_extract_float(res, "SHARPE")
trades = int(safe_extract_float(res, "TRADES"))
s_base = safe_extract_float(res, "S_BASE")
if mdd > 35.0:
s_final = 0.0
status = "⚠️ REJECTED (High MDD Risk)"
color = 0xe74c3c
else:
trades_adj = math.log10(max(trades, 1)) / 1.5
s_final = s_base * min(max(trades_adj, 0.5), 1.2) * (sharpe / 2.0 if sharpe > 0 else 0.5)
s_final = min(max(s_final, 0.0), 100.0)
status = "✅ AUDIT CLEARED"
color = 0x2ecc71
# 持久化 + 阵营荣誉贡献
bk_engine.update_audit_stats(ctx.author.id, s_final, mdd, sharpe)
eval_user = bk_engine.get_user(ctx.author.id)
if eval_user["faction"] and eval_user["faction"] in bk_engine.data["factions"]:
bk_engine.data["factions"][eval_user["faction"]]["score"] += 5
bk_engine.save_db()
h_zh = safe_extract_text(res, "H_ZH")
emb = discord.Embed(title="🛡️ BK-GTA Gene Audit Report", color=color)
emb.add_field(name="Verdict", value=f"**{status}**", inline=False)
metrics = (
f"💰 Return: {ret}% | MDD: {mdd}%\n"
f"⚖️ PF: {pf} | Sharpe: {sharpe}\n"
f"🎯 Sample Size: {trades} trades"
)
emb.add_field(name="Data Extraction", value=f"```\n{metrics}\n```", inline=False)
emb.add_field(name="Analysis Score", value=f"**{s_final:.2f} / 100**", inline=True)
emb.add_field(name="Highlights", value=h_zh, inline=False)
faction_note = ""
if eval_user["faction"]:
f_emoji = bk_engine.data["factions"].get(eval_user["faction"], {}).get("emoji", "")
faction_note = f" | {f_emoji} Faction Honor +5"
emb.set_footer(text=f"SID: {sid} | Assets +50{faction_note}")
await m.delete()
await ctx.send(embed=emb)
# ==================================================================
# 模块四:博弈盘口 (Institutional Betting System)
# ==================================================================
@new_bot.command()
@commands.has_permissions(administrator=True)
async def open_bet(ctx, *, topic: str):
"""由合伙人开启的宏观/行情博弈盘口"""
bid = str(random.randint(100, 999))
bk_engine.active_bets[bid] = {
"topic": topic, "up": 0, "down": 0,
"u_up": {}, "u_down": {},
"status": "OPEN",
"start_time": str(datetime.datetime.now())
}
await ctx.send(
f"🎰 **[Bet-ID:{bid}]** New Market Entry!\n"
f"Topic: **{topic}**\n"
f"Join: `!bet {bid} UP/DOWN [Amt]`"
)
@new_bot.command()
async def bet(ctx, bid: str, side: str, amt: int):
"""交易员消耗积分参与盘口对冲"""
if bid not in bk_engine.active_bets:
return await ctx.send("❌ Error: Invalid Bet ID.")
b = bk_engine.active_bets[bid]
if b["status"] != "OPEN":
return await ctx.send("❌ Error: This market is closed.")
u = bk_engine.get_user(ctx.author.id)
if u["points"] < amt or amt <= 0:
return await ctx.send("❌ Error: Insufficient asset points.")
side = side.upper()
if side not in ["UP", "DOWN"]:
return await ctx.send("❌ Error: Choose UP or DOWN.")
u["points"] -= amt
uid = str(ctx.author.id)
pool_key = "u_up" if side == "UP" else "u_down"
b[side.lower()] += amt
b[pool_key][uid] = b[pool_key].get(uid, 0) + amt
bk_engine.save_db()
await ctx.send(f"✅ Position Locked: {ctx.author.name} added {amt} to {side} pool.")
@new_bot.command()
@commands.has_permissions(administrator=True)
async def settle_bet(ctx, bid: str, winner: str):
"""结算盘口并自动执行积分再分配"""
if bid not in bk_engine.active_bets:
return await ctx.send("❌ Error: Invalid Bet ID.")
b = bk_engine.active_bets[bid]
winner = winner.upper()
if winner not in ["UP", "DOWN"]:
return await ctx.send("❌ Error: Winner must be UP or DOWN.")
win_pool = b["u_up"] if winner == "UP" else b["u_down"]
total_win = b["up"] if winner == "UP" else b["down"]
total_lose = b["down"] if winner == "UP" else b["up"]
if total_win > 0:
for uid, amt in win_pool.items():
profit = (amt / total_win) * total_lose
user = bk_engine.get_user(int(uid))
user["points"] += round(amt + profit)
b["status"] = "SETTLED"
bk_engine.save_db()
await ctx.send(f"🏁 Bet `{bid}` Settled! Winner: **{winner}**. Profits Distributed.")
# ==================================================================
# 模块五:PVP 竞技场 (Arena PvP Engine) [增强版]
# ==================================================================
@new_bot.command()
async def challenge(ctx, member: discord.Member, days: int = 1):
"""发起基于量化能力的 1对1 竞技(支持 1-30 天周期)"""
if member.id == ctx.author.id:
return await ctx.send("❌ Logical Error: 不能向自己发起决斗。")
if days < 1 or days > 30:
return await ctx.send("❌ 周期范围: 1-30 天。")
did = f"DUEL-{random.randint(100, 999)}"
bk_engine.pending_challenges[member.id] = {
"challenger": ctx.author.id,
"did": did,
"days": days,
"expiry": datetime.datetime.now() + datetime.timedelta(hours=1)
}
await ctx.send(
f"⚔️ **{ctx.author.name}** 发起决斗请求!目标:{member.mention}\n"
f"ID: `{did}` | 周期: **{days} 天** | `!accept` 接受。"
)
@new_bot.command()
async def accept(ctx):
"""接受竞技场挑战"""
if ctx.author.id not in bk_engine.pending_challenges:
return await ctx.send("❌ Error: No pending challenges.")
req = bk_engine.pending_challenges.pop(ctx.author.id)
did = req["did"]
days = req.get("days", 1)
bk_engine.active_duels[did] = {
"p1": req["challenger"], "p2": ctx.author.id,
"s1": None, "s2": None,
"days": days,
"start": str(datetime.datetime.now())
}
await ctx.send(
f"🔥 **ARENA LOCKED!** Duel ID: `{did}` | 周期: **{days} 天**\n"
f"双方请通过 `!duel_submit {did}` 上传审计截图。"
)
@new_bot.command()
async def duel_submit(ctx, did: str):
"""在对决中提交截图,执行最终胜负判定(含 AI 解说 + 跨阵营加成 + 自动晋升)"""
if did not in bk_engine.active_duels:
return await ctx.send("❌ Error: Duel not active.")
if not ctx.message.attachments:
return await ctx.send("❌ Refused: 请上传结算截图。")
duel = bk_engine.active_duels[did]
if ctx.author.id not in [duel["p1"], duel["p2"]]:
return
m = await ctx.send(f"🔍 **[Arena] Auditing Submission...**")
prompt = (
"Arena Duel Audit: 请识别此交易截图的综合能力评分。\n"
"FORMAT: SCORE: [0-100的数值]"
)
res = await call_gemini_multiplex(prompt, ctx.message.attachments[0])
score = safe_extract_float(res, "SCORE")
if ctx.author.id == duel["p1"]:
duel["s1"] = score
else:
duel["s2"] = score
# 双方都交了 → 执行最终结算
if duel["s1"] is not None and duel["s2"] is not None:
w_score = max(duel["s1"], duel["s2"])
l_score = min(duel["s1"], duel["s2"])
wid = duel["p1"] if duel["s1"] > duel["s2"] else duel["p2"]
lid = duel["p2"] if wid == duel["p1"] else duel["p1"]
new_w, new_l, strk = bk_engine.update_elo(wid, lid)
# ========== Arena Shoutcaster AI 解说 ==========
shout_prompt = (
f"你是一名热血电竞风格的交易竞技解说员(Arena Shoutcaster)。\n"
f"两位交易员刚完成了一场量化对决:\n"
f"🥇 胜者得分: {w_score:.1f} | 🥈 败者得分: {l_score:.1f}\n"
f"请用激昂的中文解说风格,输出 80 字以内的赛事评论。\n"
f"分析胜者核心优势、败者改进方向,像体育赛事解说一样精彩。\n"
f"FORMAT: COMMENTARY: [解说内容]"
)
shout_res = await call_gemini_multiplex(shout_prompt)
commentary = safe_extract_text(shout_res, "COMMENTARY")
if commentary == "N/A":
commentary = (
shout_res[:200]
if shout_res and "ERROR" not in shout_res
else "精彩对决!胜者展现了更出色的风控纪律与策略执行力。"
)
emb = discord.Embed(title="🏆 Arena Final Verdict", color=0xf1c40f)
emb.add_field(
name="🥇 Winner",
value=f"<@{wid}>\nScore: {w_score:.1f}\nNew ELO: {new_w}",
inline=True
)
emb.add_field(
name="🥈 Defeated",
value=f"<@{lid}>\nScore: {l_score:.1f}\nNew ELO: {new_l}",
inline=True
)
emb.add_field(
name="🎙️ Arena Shoutcaster",
value=f"```\n{commentary}\n```",
inline=False
)
# ========== 跨阵营对决荣誉加成 ==========
w_user = bk_engine.get_user(wid)
l_user = bk_engine.get_user(lid)
if (w_user["faction"] and l_user["faction"]
and w_user["faction"] != l_user["faction"]
and w_user["faction"] in bk_engine.data["factions"]):
bonus = 10
bk_engine.data["factions"][w_user["faction"]]["score"] += bonus
w_emoji = bk_engine.data["factions"][w_user["faction"]]["emoji"]
emb.add_field(
name="⚔️ Cross-Faction Bonus",
value=(
f"{w_emoji} **{w_user['faction']}** "
f"获得跨阵营胜利加成 **+{bonus}** 荣誉!"
),
inline=False
)
bk_engine.save_db()
emb.set_footer(text=f"Streak: {strk} | Duel ID: {did}")
await ctx.send(embed=emb)
# ========== 3 连胜自动晋升 Arena Overlord ==========
if strk >= 3:
try:
guild = ctx.guild
role = discord.utils.get(guild.roles, name="Arena Overlord")
if not role:
role = await guild.create_role(
name="Arena Overlord",
color=discord.Color.gold(),
hoist=True,
reason="BK-GTA 3-Streak Auto Promotion"
)
winner_member = guild.get_member(wid)
if winner_member and role not in winner_member.roles:
await winner_member.add_roles(role)
await ctx.send(
f"🎖️ **PROMOTION!** <@{wid}> 达成 **{strk} 连胜**!\n"
f"自动晋升为 **Arena Overlord** 👑"
)
else:
await ctx.send(
f"🔥 <@{wid}> 已达 **{strk} 连胜**!Overlord 霸主地位稳固!"
)
except Exception as e:
logger.warning(f"Auto-promotion failed: {e}")
await ctx.send(
f"🎖️ <@{wid}> 达成 **{strk} 连胜**!"
f"(身份组授予需要 Bot 管理角色权限)"
)
bk_engine.active_duels.pop(did)
else:
await ctx.send(f"✅ Submission Received (Score: {score:.1f}). Waiting for Opponent...")
await m.delete()
# ==================================================================
# 模块六:病毒式海报渲染 (Social Poster Engine)
# ==================================================================
@new_bot.command()
async def viral_audit(ctx):
"""提取审计数据并渲染具备社交传播属性的海报图片"""
if not ctx.message.attachments:
return await ctx.send("❌ 请上传审计截图。")
m = await ctx.send("🎨 **[GPU-Node] Rendering Institutional Poster...**")
prompt = "Extract RET: [val] and SHARPE: [val]. FORMAT: RET: [v] SHARPE: [v]"
res = await call_gemini_multiplex(prompt, ctx.message.attachments[0])
ret, sharpe = safe_extract_float(res, "RET"), safe_extract_float(res, "SHARPE")
try:
W, H = 600, 800
base = Image.new('RGB', (W, H), color=(18, 20, 30))
draw = ImageDraw.Draw(base)
for i in range(0, W, 40): draw.line([(i, 0), (i, H)], fill=(30, 35, 45), width=1)
for j in range(0, H, 40): draw.line([(0, j), (W, j)], fill=(30, 35, 45), width=1)
draw.rectangle([10, 10, 590, 790], outline=(46, 204, 113), width=2)
draw.rectangle([20, 20, 580, 780], outline=(46, 204, 113), width=5)
draw.text((40, 60), "BK-GTA QUANTITATIVE SYSTEM", fill=(255, 255, 255))
draw.text((40, 150), f"TRADER ID: {ctx.author.name.upper()}", fill=(46, 204, 113))
draw.text((40, 260), f"ALPHA RETURN: +{ret}%", fill=(255, 255, 255))
draw.text((40, 360), f"SHARPE RATIO: {sharpe}", fill=(255, 255, 255))
draw.text((40, 460), f"SESSION: {SESSION_ID}", fill=(120, 120, 130))
draw.text((40, 720), "VERIFIED BY BK-GTA SENTINEL AI", fill=(46, 204, 113))
with io.BytesIO() as image_binary:
base.save(image_binary, 'PNG')
image_binary.seek(0)
await ctx.send(file=discord.File(fp=image_binary, filename=f"Viral_{ctx.author.id}.png"))
except Exception as e:
logger.error(f"Render Error: {e}")
await ctx.send(f"❌ 渲染引擎报错: `{e}`")
finally:
await m.delete()
# ==================================================================
# 模块七:每日门诊 (Daily Clinic / Sync Log)
# ==================================================================
@new_bot.command()
async def sync_log(ctx):
"""每日门诊:AI 深度心理建模,诊断报复性交易与情绪倾斜 (Tilt)"""
if not ctx.message.attachments:
return await ctx.send("❌ 请上传今日交易存根/截图进行心理诊断。")
m = await ctx.send("🏥 **[Clinic] 正在执行深度心理建模...**")
prompt = (
"你是一名顶级交易心理学专家与行为金融分析师。请分析此交易记录截图,执行以下诊断:\n"
"1. TILT_SCORE: 情绪倾斜指数 (0-10, 0=完全理性, 10=严重失控)\n"
"2. REVENGE_RISK: 报复性交易风险等级 (LOW/MEDIUM/HIGH/CRITICAL)\n"
"3. PATTERN: 识别的行为模式 (如: 恐惧离场, 过度交易, 追涨杀跌, 锚定偏差等)\n"
"4. DIAGNOSIS: 中文诊断报告 (100字内, 包含具体问题分析)\n"
"5. PRESCRIPTION: 中文处方建议 (50字内的具体行动指令)\n"
"输出格式必须严格如下:\n"
"TILT_SCORE: [值]\n"
"REVENGE_RISK: [等级]\n"
"PATTERN: [模式]\n"
"DIAGNOSIS: [诊断]\n"
"PRESCRIPTION: [处方]"
)
res = await call_gemini_multiplex(prompt, ctx.message.attachments[0])
if "ERROR" in res:
await m.delete()
return await ctx.send("🔴 [Clinic Offline] 诊断节点断连。")
tilt = safe_extract_float(res, "TILT_SCORE")
revenge = safe_extract_text(res, "REVENGE_RISK", "PATTERN")
pattern = safe_extract_text(res, "PATTERN", "DIAGNOSIS")
diagnosis = safe_extract_text(res, "DIAGNOSIS", "PRESCRIPTION")
prescription = safe_extract_text(res, "PRESCRIPTION")
if tilt <= 3:
color, status = 0x2ecc71, "🟢 心理状态健康"
elif tilt <= 6:
color, status = 0xf39c12, "🟡 轻度情绪偏移"
elif tilt <= 8:
color, status = 0xe67e22, "🟠 中度情绪倾斜"
else:
color, status = 0xe74c3c, "🔴 严重情绪失控"
emb = discord.Embed(title="🏥 BK-GTA Daily Clinic | 每日门诊", color=color)
emb.add_field(name="心理状态", value=f"**{status}**", inline=False)
emb.add_field(name="Tilt Index", value=f"**{tilt:.1f}** / 10", inline=True)
emb.add_field(name="报复性交易风险", value=f"**{revenge}**", inline=True)
emb.add_field(name="行为模式识别", value=f"`{pattern}`", inline=False)
emb.add_field(name="📋 诊断报告", value=f"```\n{diagnosis}\n```", inline=False)
emb.add_field(name="💊 处方", value=f"```\n{prescription}\n```", inline=False)
# 积分 + 阵营荣誉贡献
u = bk_engine.get_user(ctx.author.id)
u["points"] += 30
if u["faction"] and u["faction"] in bk_engine.data["factions"]:
bk_engine.data["factions"][u["faction"]]["score"] += 3
bk_engine.save_db()
faction_note = ""
if u["faction"]:
f_emoji = bk_engine.data["factions"].get(u["faction"], {}).get("emoji", "")
faction_note = f" | {f_emoji} Faction Honor +3"
emb.set_footer(text=f"Assets +30{faction_note} | Session: {SESSION_ID}")
await m.delete()
await ctx.send(embed=emb)
# ==================================================================
# 模块八:阵营系统 (Faction System) — 完整版
# ==================================================================
@new_bot.command()
async def factions(ctx):
"""查看所有阵营排名、荣誉条与实时对抗比分"""
factions_list = list(bk_engine.data["factions"].items())
factions_list.sort(key=lambda x: x[1]["score"], reverse=True)
max_score = factions_list[0][1]["score"] if factions_list else 1
emb = discord.Embed(title="⚔️ Faction War | 阵营对抗", color=0xf1c40f)
if not factions_list:
emb.description = "暂无阵营。管理员可通过 `!create_faction` 创建。"
else:
for rank_idx, (name, info) in enumerate(factions_list):
member_count = sum(
1 for u in bk_engine.data["users"].values()
if u.get("faction") == name
)
bar_len = int((info["score"] / max(max_score, 1)) * 10)
bar = "█" * bar_len + "░" * (10 - bar_len)
medal = "👑" if rank_idx == 0 else f"#{rank_idx + 1}"
emb.add_field(
name=f"{medal} {info['emoji']} {name}",
value=(
f"Honor: **{info['score']}** `{bar}`\n"
f"Wins: **{info['total_wins']}** | Members: **{member_count}**"
),
inline=False
)
emb.add_field(
name="📜 操作指南",
value=(
"`!join [阵营名]` — 加入阵营(不可更改)\n"
"`!faction_donate [数额]` — 捐献积分为阵营充能"
),
inline=False
)
emb.set_footer(text=f"Session: {SESSION_ID} | 荣誉来源: 对决/审计/门诊/捐献")
await ctx.send(embed=emb)
@new_bot.command()
async def join(ctx, faction_name: str):
"""加入阵营,绑定后不可更改"""
faction_name = faction_name.upper()
if faction_name not in bk_engine.data["factions"]:
available = ", ".join(bk_engine.data["factions"].keys())
return await ctx.send(f"❌ 无效阵营。可选: `{available}`")
u = bk_engine.get_user(ctx.author.id)
if u["faction"] is not None:
emoji = bk_engine.data["factions"].get(u["faction"], {}).get("emoji", "")
return await ctx.send(f"❌ 你已绑定阵营 {emoji} **{u['faction']}**,不可更改。")
u["faction"] = faction_name
bk_engine.save_db()
f_info = bk_engine.data["factions"][faction_name]
await ctx.send(
f"✅ {f_info['emoji']} **{ctx.author.name}** 已加入 **{faction_name}** 阵营!为荣誉而战!"
)
@new_bot.command()
async def faction_donate(ctx, amount: int):
"""将个人积分捐献给阵营荣誉池"""
u = bk_engine.get_user(ctx.author.id)
if u["faction"] is None:
return await ctx.send("❌ 你尚未加入任何阵营。请先 `!factions` 查看并 `!join` 加入。")
if amount <= 0 or u["points"] < amount:
return await ctx.send(f"❌ 积分不足或数额无效。你当前持有: **{u['points']}**")
faction = u["faction"]
if faction not in bk_engine.data["factions"]:
return await ctx.send("❌ 你的阵营已被解散,请联系管理员。")
u["points"] -= amount
bk_engine.data["factions"][faction]["score"] += amount
bk_engine.save_db()
emoji = bk_engine.data["factions"][faction]["emoji"]
await ctx.send(
f"🏛️ **{ctx.author.name}** 向 {emoji} **{faction}** 捐献了 **{amount}** 荣誉点!\n"
f"阵营总荣誉: **{bk_engine.data['factions'][faction]['score']}** | "
f"个人剩余积分: **{u['points']}**"
)
@new_bot.command()
@commands.has_permissions(administrator=True)
async def create_faction(ctx, name: str, emoji: str = "⚔️"):
"""管理员创建新阵营"""
name = name.upper()
if name in bk_engine.data["factions"]:
return await ctx.send(f"❌ 阵营 **{name}** 已存在。")
if len(name) > 20:
return await ctx.send("❌ 阵营名称不能超过 20 个字符。")
bk_engine.data["factions"][name] = {
"score": 0,
"emoji": emoji,
"total_wins": 0
}
bk_engine.save_db()
await ctx.send(
f"✅ 新阵营创建成功!\n"
f"{emoji} **{name}** 已加入阵营战争!\n"
f"成员可通过 `!join {name}` 加入。"
)
@new_bot.command()
@commands.has_permissions(administrator=True)
async def delete_faction(ctx, name: str):
"""管理员删除阵营(成员将变为无阵营状态)"""
name = name.upper()
if name not in bk_engine.data["factions"]:
available = ", ".join(bk_engine.data["factions"].keys())
return await ctx.send(f"❌ 阵营 **{name}** 不存在。\n当前阵营: `{available}`")
affected = 0
for uid, user_data in bk_engine.data["users"].items():
if user_data.get("faction") == name:
user_data["faction"] = None
affected += 1
emoji = bk_engine.data["factions"][name]["emoji"]
del bk_engine.data["factions"][name]
bk_engine.save_db()
await ctx.send(
f"🗑️ 阵营 {emoji} **{name}** 已解散。\n"
f"受影响成员: **{affected}** 人(已重置为无阵营状态,可重新 `!join`)。"
)
# ==================================================================
# 模块九:知识库治理 (Knowledge Archive)
# ==================================================================
@new_bot.command()
@commands.has_permissions(administrator=True)
async def archive(ctx, msg_id: int):
"""将优质案例入库至 #📚-case-study 频道永久保存"""
try:
source_msg = await ctx.channel.fetch_message(msg_id)
except discord.NotFound:
return await ctx.send("❌ 消息未找到,请确认 Message ID 正确。")
except Exception as e:
return await ctx.send(f"❌ 获取消息失败: `{e}`")
archive_channel = discord.utils.get(ctx.guild.text_channels, name="📚-case-study")
if not archive_channel:
archive_channel = discord.utils.get(ctx.guild.text_channels, name="case-study")
if not archive_channel:
return await ctx.send("❌ 未找到 `#📚-case-study` 或 `#case-study` 频道,请先创建。")
emb = discord.Embed(
title="📚 Archived Case Study",
description=source_msg.content[:4096] if source_msg.content else "(Embed/Attachment)",
color=0x9b59b6,
timestamp=source_msg.created_at
)
emb.add_field(name="Author", value=source_msg.author.mention, inline=True)
emb.add_field(name="Source", value=ctx.channel.mention, inline=True)
emb.add_field(name="Archived By", value=ctx.author.mention, inline=True)
if source_msg.embeds:
orig = source_msg.embeds[0]
if orig.description:
emb.add_field(name="Original Content", value=orig.description[:1024], inline=False)
files = []
for att in source_msg.attachments:
try:
file_data = await att.read()
files.append(discord.File(io.BytesIO(file_data), filename=att.filename))
except Exception:
pass
await archive_channel.send(embed=emb, files=files if files else None)
await ctx.send(f"✅ 案例已归档至 {archive_channel.mention}")
# ==================================================================
# 模块十:多语言镜像翻译 (Global Translate Engine)
# ==================================================================
@new_bot.command()
@commands.has_permissions(administrator=True)
async def global_translate(ctx, msg_id: int, lang: str = "EN"):
"""生成指定语言的镜像报告 (EN/JP/RU/KR/ES/FR/DE/AR/PT)"""
try:
source_msg = await ctx.channel.fetch_message(msg_id)
except discord.NotFound:
return await ctx.send("❌ 消息未找到。")
except Exception as e:
return await ctx.send(f"❌ 获取消息失败: `{e}`")
lang = lang.upper()
lang_map = {
"EN": "English", "JP": "Japanese", "RU": "Russian",
"KR": "Korean", "ES": "Spanish", "FR": "French",
"DE": "German", "AR": "Arabic", "PT": "Portuguese"
}
target_lang = lang_map.get(lang, lang)
content = source_msg.content or ""
if source_msg.embeds:
for emb in source_msg.embeds:
if emb.title: content += f"\n{emb.title}"
if emb.description: content += f"\n{emb.description}"
for field in emb.fields:
content += f"\n{field.name}: {field.value}"
if not content.strip():
return await ctx.send("❌ 消息内容为空,无法翻译。")
m = await ctx.send(f"🌐 **[Translator] Generating {target_lang} mirror...**")
prompt = (
f"你是专业金融翻译官。请将以下内容精准翻译为 {target_lang}。\n"
f"要求:保持金融术语专业性,保留所有数字和格式结构。\n"
f"原文:\n{content}"
)
res = await call_gemini_multiplex(prompt)
if "ERROR" in res:
await m.delete()
return await ctx.send("🔴 翻译引擎离线。")
result_emb = discord.Embed(
title=f"🌐 Mirror Report [{lang}] | {target_lang}",
description=res[:4096],
color=0x3498db
)
result_emb.set_footer(text=f"ZH → {target_lang} | Session: {SESSION_ID}")
await m.delete()
await ctx.send(embed=result_emb)
# ==================================================================
# 系统对账与自动化任务 (System Reliability)
# ==================================================================
@tasks.loop(hours=1)
async def auto_backup():
"""本地资产数据库每小时原子化备份"""
bk_engine.save_db()
logger.info("💾 [Checkpoint] Data persistence executed.")
@new_bot.event
async def on_ready():
if not auto_backup.is_running(): auto_backup.start()
activity = discord.Activity(type=discord.ActivityType.watching, name="HFT Markets")
await new_bot.change_presence(status=discord.Status.online, activity=activity)
logger.info(f"--------------------------------------------------")
logger.info(f"✅ [V62.9 Final] BK-GTA Architecture Online")
logger.info(f"🚀 Session: {SESSION_ID} | Commands: {len(new_bot.commands)}")
logger.info(f"📊 Factions: {len(bk_engine.data['factions'])} | Users: {len(bk_engine.data['users'])}")
logger.info(f"--------------------------------------------------")
@new_bot.event
async def on_disconnect():
logger.warning("⚠️ [Gateway] Disconnected. discord.py will auto-reconnect...")
@new_bot.event
async def on_resumed():
logger.info("✅ [Gateway] Session resumed successfully.")
@new_bot.event
async def on_command_error(ctx, error):
"""全局错误拦截器,严禁静默失效"""
if isinstance(error, commands.CommandNotFound): return
if isinstance(error, commands.MissingPermissions):
return await ctx.send("❌ 权限审计未通过:仅限合伙人执行。")
exc = "".join(traceback.format_exception(type(error), error, error.__traceback__))
logger.error(f"🔥 [System Crash] {exc}")
await ctx.send(f"⚠️ [Logic Breach] 内部逻辑断裂: `{error}`")
logger.info(f"📋 [Factory] Bot instance created with {len(new_bot.commands)} commands.")
return new_bot
# ==============================================================================
# 6. 物理点火启动链 (Physical Start Engine)
# ==============================================================================
bot = create_bot_with_commands()
if __name__ == "__main__":
logger.info("=" * 55)
logger.info(" BK-GTA V62.9 Final — HuggingFace Edition")
logger.info(f" Token: {'YES' if DISCORD_TOKEN else '!! MISSING !!'} ({len(DISCORD_TOKEN)} chars)")
logger.info(f" Gemini Keys: {len(KEY_POOL)}")
logger.info(f" DB Path: {DB_PATH}")
logger.info(f" Session: {SESSION_ID}")
logger.info("=" * 55)
# 1. 物理启动 Gradio 保活接口
keep_alive()
# 2. [HuggingFace 专用] 等待容器网络就绪
logger.info("⏳ [Boot] Waiting 10s for container network to initialize...")
time.sleep(10)
# 3. 逻辑启动 Discord Bot 核心
if not DISCORD_TOKEN:
logger.critical("❌ CRITICAL: DISCORD_TOKEN is missing in environment.")
while True:
time.sleep(3600)
else:
max_retries = 5
for attempt in range(1, max_retries + 1):
try:
if attempt > 1:
wait = min(30 * attempt, 120)
logger.info(f"⏳ [Boot] Retry in {wait}s (attempt {attempt}/{max_retries})...")
time.sleep(wait)
bot = create_bot_with_commands()
logger.info(f"🚀 [Boot] Starting Discord connection (attempt {attempt}/{max_retries})...")
bot.run(DISCORD_TOKEN)
break
except discord.LoginFailure:
logger.critical("❌ [FATAL] Invalid DISCORD_TOKEN! Check token in Secrets.")
break
except discord.PrivilegedIntentsRequired:
logger.critical(
"❌ [FATAL] Privileged Intents not enabled!\n"
"→ https://discord.com/developers/applications → Bot tab\n"
"→ Enable 'SERVER MEMBERS INTENT' and 'MESSAGE CONTENT INTENT'"
)
break
except (OSError, ConnectionError) as e:
logger.error(f"🌐 [Boot] Network error (attempt {attempt}): {e}")
except Exception as e:
logger.critical(f"🔥 ENGINE COLLAPSE (attempt {attempt}): {e}")
traceback.print_exc()
logger.warning("🔴 [Boot] Bot stopped. Keeping process alive for Gradio...")
while True:
time.sleep(3600)
# ==============================================================================
# END OF ARCHITECT SOURCE CODE | BK-GTA V62.9 Final — HuggingFace Edition
# ==============================================================================