Nine7 / app.py
ljx77qaq's picture
Rename app (7).py to app.py
2f9e681 verified
import socket
import time
import os
import json
import asyncio
import html
import re
import hmac
import hashlib
import queue
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from threading import Thread, Lock, Timer
from urllib.parse import parse_qs, unquote
from functools import wraps
import urllib3.util.connection as urllib3_cn
# 🌟 强制 IPv4
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
import telebot
from telebot import types, apihelper
from telebot.apihelper import ApiTelegramException
from flask import Flask, request, jsonify, send_file, Response
from webdav4.client import Client as WebDAVClient
from telethon import TelegramClient
from telethon.sessions import StringSession
from telethon.extensions import html as tl_html
from apscheduler.schedulers.asyncio import AsyncIOScheduler
try:
from pypinyin import lazy_pinyin
except ImportError:
print("⚠️ 未安装 pypinyin")
# ===== 2. WebDAV 存储配置 =====
DAV_URL_BASE = os.environ.get("WEBDAV_URL", "").rstrip("/")
DAV_USER = os.environ.get("WEBDAV_USER") or os.environ.get("WEBDAV_USERNAME")
DAV_PASS = os.environ.get("WEBDAV_PASS") or os.environ.get("WEBDAV_PASSWORD")
DAV_SUB_PATH = os.environ.get("WEBDAV_PATH", "").strip("/")
FULL_WEBDAV_URL = f"{DAV_URL_BASE}/{DAV_SUB_PATH}/" if DAV_SUB_PATH else f"{DAV_URL_BASE}/"
REMOTE_FILENAME = "tg_bot_data_v5.json"
DATA = {"users": {}, "msg_map": {}, "backup_log": {}}
data_lock = Lock()
# 👇 增加内存级网页源码缓存字典
HTML_CACHE = {}
TL_LOOP = None
TL_CLIENT = None
SEPARATOR_MARK = "\n\n➖➖➖➖➖➖\n"
def get_dav_client():
return WebDAVClient(base_url=FULL_WEBDAV_URL, auth=(DAV_USER, DAV_PASS))
def load_data():
global DATA
try:
print(f"📡 正在尝试连接 WebDAV...")
client = get_dav_client()
if client.exists(REMOTE_FILENAME):
client.download_file(REMOTE_FILENAME, "temp_data.json")
with open("temp_data.json", "r") as f:
loaded_data = json.load(f)
DATA["users"] = loaded_data.get("users", {})
DATA["msg_map"] = loaded_data.get("msg_map", {})
DATA["backup_log"] = loaded_data.get("backup_log", {})
print(f"✅ 成功下载恢复云端数据!映射条数: {len(DATA['msg_map'])}")
else:
save_data(force=True)
except Exception as e: print(f"❌ WebDAV 初始化出错: {e}")
def save_data(force=True):
with data_lock:
try:
with open("temp_data.json", "w") as f: json.dump(DATA, f)
get_dav_client().upload_file("temp_data.json", REMOTE_FILENAME, overwrite=True)
print(f"☁️ [WebDAV] 数据已同步: {time.strftime('%H:%M:%S')}")
except Exception as e: print(f"❌ [WebDAV] 上传失败: {e}")
load_data()
# ===== 3. Telebot 主逻辑 =====
BOT_TOKEN = os.environ.get("BOT_TOKEN")
bot = telebot.TeleBot(BOT_TOKEN)
apihelper.API_URL = "https://nine7.linlizhi0210.workers.dev/bot{0}/{1}"
user_states = {}
ALL_TYPES = ['text', 'audio', 'document', 'photo', 'sticker', 'video', 'video_note', 'voice', 'location', 'contact', 'animation', 'dice', 'poll']
# ====== 频率限制 ======
_rate_limit = defaultdict(list)
RATE_LIMIT_MAX = 30
RATE_LIMIT_WINDOW = 60
def check_rate_limit(uid):
now = time.time()
_rate_limit[uid] = [t for t in _rate_limit[uid] if now - t < RATE_LIMIT_WINDOW]
if len(_rate_limit[uid]) >= RATE_LIMIT_MAX:
return True
_rate_limit[uid].append(now)
return False
# ====== SSE 实时推送队列 ======
_event_queues = defaultdict(lambda: queue.Queue(maxsize=50))
def push_event(uid, event_type, data):
q = _event_queues.get(uid)
if q:
try:
q.put_nowait({
"type": event_type,
"data": data,
"time": time.strftime("%H:%M:%S")
})
except queue.Full:
pass
try:
bot.set_my_commands([
types.BotCommand("start", "🤖 初始化控制台"),
types.BotCommand("channels", "📔 频道地址簿"),
types.BotCommand("btn_new", "🆕 发送带按钮消息"),
types.BotCommand("btn_old", "🔘 给旧消息加按钮"),
types.BotCommand("add_stat", "📈 创建统计任务"),
types.BotCommand("list_stat", "📊 管理统计任务"),
types.BotCommand("add_dir", "🗂️ 创建自动目录"),
types.BotCommand("list_dir", "📂 管理目录任务"),
types.BotCommand("dir", "🗂️ 生成手动目录"),
types.BotCommand("replace_tag", "🔄 批量替换标签"),
types.BotCommand("backup", "🚀 智能全量备份"),
types.BotCommand("add", "➕ 添加同步组"),
types.BotCommand("list", "📋 我的同步组"),
types.BotCommand("status", "⚙️ 存储状态")
])
except Exception: pass
media_group_cache = {}
mg_lock = Lock()
def send_channel_prompt(uid, text):
address_book = DATA["users"].get(uid, {}).get("address_book", {})
markup = None
if address_book:
markup = types.InlineKeyboardMarkup(row_width=2)
btns = [types.InlineKeyboardButton(f"📔 {name}", callback_data=f"selch_{cid}") for cid, name in address_book.items()]
markup.add(*btns)
bot.send_message(uid, text, reply_markup=markup, parse_mode="Markdown")
@bot.message_handler(commands=['start'])
def cmd_start(message):
uid = str(message.from_user.id)
if uid not in DATA["users"]:
DATA["users"][uid] = {"groups": [], "stats_tasks": [], "dir_tasks": [], "address_book": {}}
save_data()
bot.reply_to(message, f"🤖 控制台已就绪\n云端路径:`{FULL_WEBDAV_URL}`\n👇 请点击左下角 **Menu** 展开操作菜单。", parse_mode="Markdown")
@bot.message_handler(commands=['channels'])
def cmd_channels(message):
uid = str(message.from_user.id)
address_book = DATA["users"].get(uid, {}).get("address_book", {})
msg_text = "📔 **我的频道地址簿**\n\n"
if not address_book: msg_text += "暂无保存的频道。请点击下方添加。"
else:
for cid, name in address_book.items(): msg_text += f"▪️ **{name}**: `{cid}`\n"
markup = types.InlineKeyboardMarkup()
markup.add(types.InlineKeyboardButton("➕ 添加新频道", callback_data="ab_add"))
if address_book: markup.add(types.InlineKeyboardButton("🗑️ 删除频道", callback_data="ab_del_menu"))
bot.send_message(uid, msg_text, reply_markup=markup, parse_mode="Markdown")
@bot.message_handler(commands=['btn_new'])
def cmd_btn_new(message):
uid = str(message.from_user.id)
user_states[uid] = {"step": "WAIT_BTN_NEW_CH"}
send_channel_prompt(uid, "🆕 **发送带按钮的新消息**\n\n1️⃣ 请输入目标【频道 ID】(或点击下方地址簿选择):")
@bot.message_handler(commands=['btn_old'])
def cmd_btn_old(message):
uid = str(message.from_user.id)
user_states[uid] = {"step": "WAIT_BTN_OLD_CH"}
send_channel_prompt(uid, "🔘 **给旧消息加按钮 (或修改/删除)**\n\n1️⃣ 请输入目标【频道 ID】(或点击下方地址簿选择):")
@bot.message_handler(commands=['add', 'list', 'status', 'backup'])
def handle_basic_commands(message):
uid = str(message.from_user.id)
cmd = message.text.split()[0]
if cmd == '/add':
user_states[uid] = {"step": "WAIT_SRC"}
send_channel_prompt(uid, "请输入源频道 ID:")
elif cmd == '/list':
groups = DATA["users"].get(uid, {}).get("groups", [])
if not groups: return bot.send_message(uid, "暂无同步任务。")
for i, g in enumerate(groups): bot.send_message(uid, f"📦 组 {i+1}\n源: `{g['src']}`\n目: `{g['tgt']}`", reply_markup=types.InlineKeyboardMarkup().add(types.InlineKeyboardButton("🗑 删除", callback_data=f"del_{i}")))
elif cmd == '/status': bot.send_message(uid, f"⚙️ 状态\n映射条数: {len(DATA['msg_map'])}")
elif cmd == '/backup':
groups = DATA["users"].get(uid, {}).get("groups", [])
markup = types.InlineKeyboardMarkup(row_width=1)
for i, g in enumerate(groups): markup.add(types.InlineKeyboardButton(f"📦 同步组 {i+1} 备份", callback_data=f"bkp_g_{i}"))
markup.add(types.InlineKeyboardButton("✏️ 手动指定通道", callback_data="bkp_m"))
bot.send_message(uid, "🚀 **请选择智能备份通道:**", reply_markup=markup, parse_mode="Markdown")
@bot.message_handler(commands=['add_stat'])
def cmd_add_stat(message):
uid = str(message.from_user.id)
user_states[uid] = {"step": "WAIT_STAT_NAME"}
bot.send_message(uid, "📈 **创建自定义统计任务**\n\n1️⃣ 请给这个任务起个名称 (如: `日常早报统计`):", parse_mode="Markdown")
@bot.message_handler(commands=['list_stat'])
def cmd_list_stat(message):
uid = str(message.from_user.id)
stats = DATA["users"].get(uid, {}).get("stats_tasks", [])
if not stats: return bot.send_message(uid, "暂无统计任务。")
for i, t in enumerate(stats):
msg = (f"🎯 **任务**: `{t.get('task_name', '未命名')}`\n📌 **频道**: `{t.get('channel_id')}` | **消息**: `{t.get('msg_id')}`\n"
f"📊 **表头**: `{t.get('table_title', '未设置标题')}`\n🔑 **触发标签**: `{t.get('trigger_tag', '#未设置')}`\n"
f"🏆 **上榜名额**: 前 `{t.get('top_n', 10)}` 名\n⏱ **更新**: 每 `{t.get('interval', 60)}` 分钟\n⏳ **剩余**: `{t.get('duration', 7)}` 天\n"
f"🚫 **屏蔽名单**: `{', '.join(t.get('stats_blacklist', [])) or '无'}`\n"
f"📝 **屏蔽区标题**: `{t.get('blacklist_title', '🚫本月轮换限制:') or '无'}`")
markup = types.InlineKeyboardMarkup(row_width=2)
markup.add(types.InlineKeyboardButton("✏️ 频道 ID", callback_data=f"e_chid_{i}"), types.InlineKeyboardButton("✏️ 消息 ID", callback_data=f"e_msgid_{i}"))
markup.add(types.InlineKeyboardButton("✏️ 任务名", callback_data=f"e_name_{i}"), types.InlineKeyboardButton("✏️ 表头标题", callback_data=f"e_titl_{i}"))
markup.add(types.InlineKeyboardButton("✏️ 触发标签", callback_data=f"e_trig_{i}"), types.InlineKeyboardButton("🏆 上榜名额", callback_data=f"e_topn_{i}"))
markup.add(types.InlineKeyboardButton("⏱ 更新频率", callback_data=f"e_intv_{i}"), types.InlineKeyboardButton("⏳ 存活期限", callback_data=f"e_dura_{i}"))
markup.add(types.InlineKeyboardButton("🚫 加屏蔽", callback_data=f"e_sabl_{i}"), types.InlineKeyboardButton("✅ 删屏蔽", callback_data=f"e_srbl_{i}"))
markup.add(types.InlineKeyboardButton("📝 屏蔽区标题", callback_data=f"e_sblt_{i}"))
markup.add(types.InlineKeyboardButton("🗑️ 删除该任务", callback_data=f"d_s_{i}"))
bot.send_message(uid, msg, reply_markup=markup, parse_mode="Markdown")
@bot.message_handler(commands=['dir'])
def cmd_build_dir(message):
uid = str(message.from_user.id)
user_states[uid] = {"step": "WAIT_MANUAL_DIR_CH"}
send_channel_prompt(uid, "🗂️ **生成频道标签目录**\n\n请输入需要扫描的【频道 ID】(例如 `-10012345678`):")
@bot.message_handler(commands=['add_dir'])
def cmd_add_dir(message):
uid = str(message.from_user.id)
user_states[uid] = {"step": "WAIT_DIR_NAME"}
bot.send_message(uid, "🗂️ **创建自动更新目录任务**\n\n1️⃣ 请给任务起个名字 (如: `主频道自动目录`):", parse_mode="Markdown")
@bot.message_handler(commands=['list_dir'])
def cmd_list_dir(message):
uid = str(message.from_user.id)
dirs = DATA["users"].get(uid, {}).get("dir_tasks", [])
if not dirs: return bot.send_message(uid, "暂无自动目录任务。")
for i, t in enumerate(dirs):
msg = (f"🗂️ **任务**: `{t.get('task_name', '未命名')}`\n"
f"📌 **频道**: `{t.get('channel_id')}` | **承载消息**: `{t.get('msg_id')}`\n"
f"⏱ **频率**: 每 `{t.get('interval', 15)}` 分钟扫描一次\n"
f"🛡️ **屏蔽标签**: `{', '.join(t.get('blacklist', [])) or '无'}`\n"
f"📦 **已收录标签**: `{len(t.get('tags_cache', []))} 个`")
markup = types.InlineKeyboardMarkup(row_width=2)
markup.add(types.InlineKeyboardButton("➕ 加屏蔽", callback_data=f"ed_ab_{i}"), types.InlineKeyboardButton("➖ 删屏蔽", callback_data=f"ed_rb_{i}"))
markup.add(types.InlineKeyboardButton("⏱ 扫描频率", callback_data=f"ed_in_{i}"))
markup.add(types.InlineKeyboardButton("🗑️ 终止并删除该目录任务", callback_data=f"d_d_{i}"))
bot.send_message(uid, msg, reply_markup=markup, parse_mode="Markdown")
@bot.message_handler(commands=['replace_tag'])
def cmd_replace_tag(message):
uid = str(message.from_user.id)
user_states[uid] = {"step": "WAIT_REP_CH"}
send_channel_prompt(uid, "🔄 **批量替换/删除标签**\n\n1️⃣ 请输入需要处理的【频道 ID】:")
def process_user_text(uid, text):
if uid not in DATA["users"]:
DATA["users"][uid] = {"groups": [], "stats_tasks": [], "dir_tasks": [], "address_book": {}}
save_data()
if uid in user_states:
state = user_states[uid]
step = state["step"]
if step == "WAIT_BTN_NEW_CH":
user_states[uid].update({"step": "WAIT_BTN_NEW_TEXT", "ch_id": text})
bot.send_message(uid, "2️⃣ 请输入要发送的**消息正文** (支持 HTML/Markdown,不需要排版可直接打字):")
elif step == "WAIT_BTN_NEW_TEXT":
user_states[uid].update({"step": "WAIT_BTN_NEW_BTEXT", "msg_text": text})
bot.send_message(uid, "3️⃣ 请输入**按钮上显示的文字** (例如: `🔗 点击进入主群`):")
elif step == "WAIT_BTN_NEW_BTEXT":
user_states[uid].update({"step": "WAIT_BTN_NEW_URL", "btn_text": text})
bot.send_message(uid, "4️⃣ 请输入**按钮跳转的链接 URL** (必须以 http/https 开头):")
elif step == "WAIT_BTN_NEW_URL":
if not text.startswith("http"):
return bot.send_message(uid, "❌ 链接格式错误!必须以 http:// 或 https:// 开头。请重新输入:")
markup = types.InlineKeyboardMarkup()
markup.add(types.InlineKeyboardButton(text=state["btn_text"], url=text))
try:
bot.send_message(state["ch_id"], state["msg_text"], reply_markup=markup, parse_mode="HTML")
bot.send_message(uid, f"✅ 成功!消息已发送至频道 `{state['ch_id']}`")
except Exception as e:
try:
bot.send_message(state["ch_id"], state["msg_text"], reply_markup=markup)
bot.send_message(uid, f"✅ 成功!消息已作为纯文本发送至频道 `{state['ch_id']}`")
except Exception as e2:
bot.send_message(uid, f"❌ 发送失败,请检查频道ID和机器人的权限: {e2}")
user_states.pop(uid)
elif step == "WAIT_BTN_OLD_CH":
user_states[uid].update({"step": "WAIT_BTN_OLD_MSG", "ch_id": text})
bot.send_message(uid, "2️⃣ 请输入目标**消息的 ID 或者 链接**:")
elif step == "WAIT_BTN_OLD_MSG":
if text.startswith('http'): text = text.split('/')[-1]
if not text.isdigit(): return bot.send_message(uid, "❌ 格式错误!请输入纯数字 ID 或者包含 ID 的链接:")
user_states[uid].update({"step": "WAIT_BTN_OLD_BTEXT", "msg_id": int(text)})
bot.send_message(uid, "3️⃣ 请输入**按钮上显示的文字**:\n💡 如果你想**删除**该消息现有的按钮,请直接回复 `删除`")
elif step == "WAIT_BTN_OLD_BTEXT":
if text.strip() == "删除":
try:
bot.edit_message_reply_markup(chat_id=state["ch_id"], message_id=state["msg_id"], reply_markup=None)
bot.send_message(uid, f"✅ 成功!已移除了消息 `{state['msg_id']}` 上的所有按钮。")
except Exception as e:
bot.send_message(uid, f"❌ 操作失败: {e}")
user_states.pop(uid)
return
user_states[uid].update({"step": "WAIT_BTN_OLD_URL", "btn_text": text})
bot.send_message(uid, "4️⃣ 请输入**按钮跳转的链接 URL**:")
elif step == "WAIT_BTN_OLD_URL":
if not text.startswith("http"):
return bot.send_message(uid, "❌ 链接格式错误!必须以 http:// 或 https:// 开头。请重新输入:")
markup = types.InlineKeyboardMarkup()
markup.add(types.InlineKeyboardButton(text=state["btn_text"], url=text))
try:
bot.edit_message_reply_markup(chat_id=state["ch_id"], message_id=state["msg_id"], reply_markup=markup)
bot.send_message(uid, f"✅ 成功!已为消息 `{state['msg_id']}` 更新了按钮。")
except Exception as e:
bot.send_message(uid, f"❌ 操作失败: {e}")
user_states.pop(uid)
elif step == "WAIT_AB_ID":
user_states[uid].update({"step": "WAIT_AB_NAME", "ab_id": text})
bot.send_message(uid, "📝 请为这个频道输入一个**自定义备注名**:")
elif step == "WAIT_AB_NAME":
if "address_book" not in DATA["users"][uid]: DATA["users"][uid]["address_book"] = {}
DATA["users"][uid]["address_book"][state["ab_id"]] = text
save_data()
bot.send_message(uid, f"✅ 已将频道 `{state['ab_id']}` 保存为 **{text}**!")
user_states.pop(uid)
elif step == "WAIT_SRC":
user_states[uid] = {"step": "WAIT_TGT", "src": text}
send_channel_prompt(uid, "请输入目标频道 ID:")
elif step == "WAIT_TGT":
DATA["users"][uid]["groups"].append({"src": state["src"], "tgt": text})
save_data()
bot.send_message(uid, "✅ 转发配置完毕!")
user_states.pop(uid)
elif step == "WAIT_STAT_NAME":
user_states[uid].update({"step": "WAIT_STAT_CH", "task_name": text})
send_channel_prompt(uid, "2️⃣ 请输入**频道 ID**:")
elif step == "WAIT_STAT_CH":
user_states[uid].update({"step": "WAIT_STAT_MSG", "ch_id": text})
bot.send_message(uid, "3️⃣ 请输入**消息 ID 或者 消息链接**:")
elif step == "WAIT_STAT_MSG":
if text.startswith('http'): text = text.split('/')[-1]
if not text.isdigit(): return bot.send_message(uid, "❌ 格式错误!请重新输入消息 ID:")
user_states[uid].update({"step": "WAIT_STAT_TITLE", "msg_id": text})
bot.send_message(uid, "4️⃣ 请输入**统计表显示的标题**:")
elif step == "WAIT_STAT_TITLE":
user_states[uid].update({"step": "WAIT_STAT_TOPN", "table_title": text})
bot.send_message(uid, "5️⃣ 请输入**只显示前几名** (纯数字):")
elif step == "WAIT_STAT_TOPN":
if not text.isdigit(): return bot.send_message(uid, "❌ 只能输入纯数字!")
user_states[uid].update({"step": "WAIT_STAT_TRIGGER", "top_n": int(text)})
bot.send_message(uid, "6️⃣ 请输入**触发更新的自定义标签** (带 # 号):")
elif step == "WAIT_STAT_TRIGGER":
if not text.startswith('#'): return bot.send_message(uid, "❌ 必须以 # 开头!")
user_states[uid].update({"step": "WAIT_STAT_INTERVAL", "trigger_tag": text})
bot.send_message(uid, "7️⃣ 设置**更新频率** (分钟):")
elif step == "WAIT_STAT_INTERVAL":
if not text.isdigit(): return bot.send_message(uid, "❌ 只能输入纯数字!")
user_states[uid].update({"step": "WAIT_STAT_DURATION", "interval": int(text)})
bot.send_message(uid, "8️⃣ 设置**寿命期限** (天):")
elif step == "WAIT_STAT_DURATION":
if not text.isdigit(): return bot.send_message(uid, "❌ 只能输入纯数字!")
user_states[uid].update({"step": "WAIT_STAT_BLACKLIST", "duration": int(text)})
bot.send_message(uid, "9️⃣ 请输入**屏蔽名单** (用空格隔开)\n💡 不需要屏蔽请回复 `无`:")
elif step == "WAIT_STAT_BLACKLIST":
blacklist = [] if text.strip() == "无" else [x.strip() for x in re.split(r'[\s\n]+', text) if x.strip()]
user_states[uid].update({"step": "WAIT_STAT_BL_TITLE", "stats_blacklist": blacklist})
bot.send_message(uid, "🔟 请输入**屏蔽区的标题** (例如 `🚫本月轮换限制:`)\n💡 不需要请回复 `无`:")
elif step == "WAIT_STAT_BL_TITLE":
bl_title = "" if text.strip() == "无" else text.strip()
if "stats_tasks" not in DATA["users"][uid]: DATA["users"][uid]["stats_tasks"] = []
DATA["users"][uid]["stats_tasks"].append({
"task_name": state["task_name"], "channel_id": state["ch_id"], "msg_id": state["msg_id"],
"table_title": state["table_title"], "top_n": state["top_n"], "trigger_tag": state["trigger_tag"],
"interval": state["interval"], "duration": state["duration"], "start_time": int(time.time()),
"last_run": 0, "completed_items": [], "last_checked_msg_id": int(state["msg_id"]),
"stats_blacklist": state["stats_blacklist"],
"blacklist_title": bl_title
})
save_data()
bot.send_message(uid, "✅ 完美!任务已创建。")
user_states.pop(uid)
elif step.startswith("EDIT_STAT_"):
idx = state["idx"]
try:
task = DATA["users"][uid]["stats_tasks"][idx]
if step == "EDIT_STAT_NAME": task["task_name"] = text
elif step == "EDIT_STAT_TITL": task["table_title"] = text
elif step == "EDIT_STAT_TOPN": task["top_n"] = int(text)
elif step == "EDIT_STAT_INTV": task["interval"] = int(text)
elif step == "EDIT_STAT_DURA": task["duration"] = int(text)
elif step == "EDIT_STAT_CHID": task["channel_id"] = text
elif step == "EDIT_STAT_TRIG":
if not text.startswith('#'): return bot.send_message(uid, "❌ 必须以 # 开头!")
task["trigger_tag"] = text
elif step == "EDIT_STAT_MSGID":
if text.startswith('http'): text = text.split('/')[-1]
if not text.isdigit(): return bot.send_message(uid, "❌ 格式错误!")
task["msg_id"] = text
task['last_checked_msg_id'] = int(text)
task['completed_items'] = []
elif step == "EDIT_STAT_ADDBL":
task.setdefault("stats_blacklist", [])
new_items = [x.strip() for x in re.split(r'[\s\n]+', text) if x.strip()]
task["stats_blacklist"].extend(new_items)
task["stats_blacklist"] = list(set(task["stats_blacklist"]))
elif step == "EDIT_STAT_RMBL":
to_remove = [x.strip() for x in re.split(r'[\s\n]+', text) if x.strip()]
task["stats_blacklist"] = [x for x in task.get("stats_blacklist", []) if x not in to_remove]
elif step == "EDIT_STAT_BLTITLE":
task["blacklist_title"] = "" if text.strip() == "无" else text.strip()
task["last_html_stats"] = ""
task["last_run"] = 0
save_data()
bot.send_message(uid, "✅ 属性已修改!下次刷新周期将立即更新。")
except Exception as e: bot.send_message(uid, f"❌ 修改失败: {e}")
user_states.pop(uid)
elif step == "WAIT_BACKUP_SRC":
user_states[uid] = {"step": "WAIT_BACKUP_TGT", "src": text}
send_channel_prompt(uid, "📌 请输入【目标频道 ID】:")
elif step == "WAIT_BACKUP_TGT":
user_states[uid].update({"step": "WAIT_BACKUP_ID_M", "tgt": text})
bot.send_message(uid, "🚀 请粘贴源频道中最新那条消息的链接:")
elif step in ["WAIT_BACKUP_ID_G", "WAIT_BACKUP_ID_M"]:
try:
latest_id = int(text.strip().split('/')[-1])
if step == "WAIT_BACKUP_ID_G":
idx = state["g_idx"]
src = DATA["users"][uid]["groups"][idx]["src"]
tgt = DATA["users"][uid]["groups"][idx]["tgt"]
else:
src = state["src"]
tgt = state["tgt"]
Thread(target=run_smart_backup_v2, args=(latest_id, uid, src, tgt)).start()
user_states.pop(uid)
except Exception as e: bot.send_message(uid, f"❌ 链接解析失败: {e}")
elif step == "WAIT_MANUAL_DIR_CH":
bot.send_message(uid, "🔍 正在扫描频道历史标签,请耐心等待...")
Thread(target=generate_smart_directory, args=(uid, text)).start()
user_states.pop(uid)
elif step == "WAIT_DIR_NAME":
user_states[uid].update({"step": "WAIT_DIR_CH", "task_name": text})
send_channel_prompt(uid, "2️⃣ 请输入**频道 ID**:")
elif step == "WAIT_DIR_CH":
user_states[uid].update({"step": "WAIT_DIR_MSG", "ch_id": text})
bot.send_message(uid, "3️⃣ 请输入承载目录的**消息 ID**:")
elif step == "WAIT_DIR_MSG":
if text.startswith('http'): text = text.split('/')[-1]
user_states[uid].update({"step": "WAIT_DIR_BLACKLIST", "msg_id": text})
bot.send_message(uid, "4️⃣ 请输入需要**屏蔽的标签** (空格隔开,不屏蔽回复 `无`):")
elif step == "WAIT_DIR_BLACKLIST":
blacklist = [] if text.strip() == "无" else text.split()
if "dir_tasks" not in DATA["users"][uid]: DATA["users"][uid]["dir_tasks"] = []
DATA["users"][uid]["dir_tasks"].append({
"task_name": state["task_name"], "channel_id": state["ch_id"],
"msg_id": state["msg_id"], "blacklist": blacklist, "interval": 15,
"tags_cache": [], "tags_map": {}, "scanned_msgs": {}, "last_html_dir": ""
})
save_data()
bot.send_message(uid, "✅ 目录任务建立完成!")
user_states.pop(uid)
elif step.startswith("EDIT_DIR_"):
idx = state["idx"]
try:
task = DATA["users"][uid]["dir_tasks"][idx]
if step == "EDIT_DIR_ADDBL":
task["blacklist"].extend(text.split())
task["blacklist"] = list(set(task["blacklist"]))
elif step == "EDIT_DIR_RMBL":
to_rem = text.split()
task["blacklist"] = [t for t in task["blacklist"] if t not in to_rem]
elif step == "EDIT_DIR_INTV":
task["interval"] = int(text)
save_data()
bot.send_message(uid, "✅ 目录属性已修改!")
except Exception as e: bot.send_message(uid, f"❌ 修改失败: {e}")
user_states.pop(uid)
elif step == "WAIT_REP_CH":
user_states[uid].update({"step": "WAIT_REP_OLD", "ch_id": text})
bot.send_message(uid, "2️⃣ 请输入【要被替换的旧标签】(带 #):")
elif step == "WAIT_REP_OLD":
if not text.startswith('#'): return bot.send_message(uid, "❌ 必须以 # 开头!")
user_states[uid].update({"step": "WAIT_REP_NEW", "old_tag": text})
bot.send_message(uid, "3️⃣ 请输入【新标签】(想删除回复 `删除`):")
elif step == "WAIT_REP_NEW":
new_tag = "" if text == "删除" else text
ch_id = state["ch_id"]
old_tag = state["old_tag"]
bot.send_message(uid, f"🚀 开始批量替换...")
Thread(target=batch_replace_tags, args=(uid, ch_id, old_tag, new_tag)).start()
user_states.pop(uid)
else: bot.send_message(uid, "💡 请用 **Menu** 菜单选择指令。")
@bot.message_handler(func=lambda m: m.chat.type == 'private')
def handle_private(message):
uid = str(message.from_user.id)
text = message.text.strip()
process_user_text(uid, text)
@bot.callback_query_handler(func=lambda call: call.data.startswith("selch_"))
def handle_selch(call):
uid = str(call.from_user.id)
ch_id = call.data.split("_", 1)[1]
bot.send_message(uid, f"👉 已快捷选择频道: `{ch_id}`", parse_mode="Markdown")
process_user_text(uid, ch_id)
bot.answer_callback_query(call.id)
@bot.callback_query_handler(func=lambda call: call.data.startswith("bkp_"))
def handle_backup_callbacks(call):
uid = str(call.from_user.id)
if call.data == "bkp_m":
user_states[uid] = {"step": "WAIT_BACKUP_SRC"}
send_channel_prompt(uid, "✏️ **手动备份模式**\n\n请输入【源频道 ID】:")
elif call.data.startswith("bkp_g_"):
idx = int(call.data.split("_")[2])
user_states[uid] = {"step": "WAIT_BACKUP_ID_G", "g_idx": idx}
bot.edit_message_text(f"🚀 已选择 **组 {idx+1}**\n\n请粘贴源频道最新消息的链接:", call.message.chat.id, call.message.message_id, parse_mode="Markdown")
@bot.callback_query_handler(func=lambda call: True)
def handle_callbacks(call):
uid = str(call.from_user.id)
data = call.data
if data == "ab_add":
user_states[uid] = {"step": "WAIT_AB_ID"}
bot.send_message(uid, "📌 请输入要保存的**频道 ID**:")
return bot.answer_callback_query(call.id)
elif data == "ab_del_menu":
address_book = DATA["users"].get(uid, {}).get("address_book", {})
markup = types.InlineKeyboardMarkup(row_width=1)
for cid, name in address_book.items(): markup.add(types.InlineKeyboardButton(f"❌ 删除: {name}", callback_data=f"ab_del_{cid}"))
bot.edit_message_text("🗑️ 请选择要删除的频道:", call.message.chat.id, call.message.message_id, reply_markup=markup)
return bot.answer_callback_query(call.id)
elif data.startswith("ab_del_"):
cid = data.split("ab_del_")[1]
if "address_book" in DATA["users"][uid] and cid in DATA["users"][uid]["address_book"]:
del DATA["users"][uid]["address_book"][cid]
save_data()
bot.edit_message_text("✅ 频道已移除。", call.message.chat.id, call.message.message_id)
return bot.answer_callback_query(call.id)
elif data.startswith("selch_") or data.startswith("bkp_"): return
try:
action, idx_str = data.rsplit("_", 1)
idx = int(idx_str)
except ValueError: return
if action == "del":
del DATA["users"][uid]["groups"][idx]
bot.edit_message_text("❌ 转发任务已移除", call.message.chat.id, call.message.message_id)
save_data()
elif action == "d_s":
del DATA["users"][uid]["stats_tasks"][idx]
bot.edit_message_text("❌ 统计任务已移除", call.message.chat.id, call.message.message_id)
save_data()
elif action == "d_d":
del DATA["users"][uid]["dir_tasks"][idx]
bot.edit_message_text("❌ 目录任务已移除", call.message.chat.id, call.message.message_id)
save_data()
elif action in ["e_name", "e_titl", "e_trig", "e_topn", "e_intv", "e_dura", "e_chid", "e_msgid", "e_sabl", "e_srbl", "e_sblt", "ed_ab", "ed_rb", "ed_in"]:
prompt_map = {
"e_name": "📌 请输入新的任务名称:",
"e_titl": "📌 请输入新的表头标题:",
"e_trig": "📌 请输入新的触发标签(带#):",
"e_topn": "📌 请输入新的上榜名额(纯数字):",
"e_intv": "📌 请输入新的更新频率(分钟):",
"e_dura": "📌 请输入新的存活期限(天):",
"e_chid": "📌 请输入新的频道 ID:",
"e_msgid": "📌 请输入新的消息 ID 或链接:",
"e_sabl": "🚫 请输入要**屏蔽的名字** (空格隔开):",
"e_srbl": "✅ 请输入要**解除屏蔽的名字** (空格隔开):",
"e_sblt": "📝 请输入**屏蔽区的显示标题**\n(例如 `🚫本月轮换限制:`,回复 `无` 则不显示):",
"ed_ab": "📌 请输入要追加的屏蔽标签(空格隔开):",
"ed_rb": "📌 请输入要移出屏蔽的标签(空格隔开):",
"ed_in": "📌 请输入新的扫描频率(分钟):"
}
state_map = {
"e_name": "EDIT_STAT_NAME",
"e_titl": "EDIT_STAT_TITL",
"e_trig": "EDIT_STAT_TRIG",
"e_topn": "EDIT_STAT_TOPN",
"e_intv": "EDIT_STAT_INTV",
"e_dura": "EDIT_STAT_DURA",
"e_chid": "EDIT_STAT_CHID",
"e_msgid": "EDIT_STAT_MSGID",
"e_sabl": "EDIT_STAT_ADDBL",
"e_srbl": "EDIT_STAT_RMBL",
"e_sblt": "EDIT_STAT_BLTITLE",
"ed_ab": "EDIT_DIR_ADDBL",
"ed_rb": "EDIT_DIR_RMBL",
"ed_in": "EDIT_DIR_INTV"
}
user_states[uid] = {"step": state_map[action], "idx": idx}
bot.send_message(uid, prompt_map[action])
bot.answer_callback_query(call.id, "请在对话框输入新值")
def run_smart_backup_v2(latest_id, uid, src, tgt):
global TL_LOOP, TL_CLIENT
backup_key = f"{src}_to_{tgt}"
if backup_key not in DATA["backup_log"]: DATA["backup_log"][backup_key] = []
if not TL_LOOP or not TL_CLIENT: return bot.send_message(uid, "❌ 错误: Userbot 未启动。")
bot.send_message(uid, f"🔍 **[双擎同步 1/2]**\n正在扫描历史...")
async def fetch_valid_tasks():
messages = []
ref_msg = await TL_CLIENT.get_messages(int(src), ids=latest_id)
target_grouped_id = ref_msg.grouped_id if ref_msg else None
async for msg in TL_CLIENT.iter_messages(int(src)):
if msg.action is not None:
continue
if msg.id > latest_id:
if target_grouped_id and msg.grouped_id == target_grouped_id: messages.append(msg)
continue
messages.append(msg)
messages.reverse()
tasks, album_cache = [], {}
for msg in messages:
if msg.grouped_id:
if msg.grouped_id not in album_cache:
album_cache[msg.grouped_id] = []
tasks.append({'type': 'album', 'grouped_id': msg.grouped_id})
album_cache[msg.grouped_id].append(msg.id)
else: tasks.append({'type': 'single', 'id': msg.id})
return tasks, album_cache, len(messages)
try:
future = asyncio.run_coroutine_threadsafe(fetch_valid_tasks(), TL_LOOP)
tasks, album_cache, total_valid = future.result(timeout=300)
except Exception as e: return bot.send_message(uid, f"❌ 扫描失败: {e}")
bot.send_message(uid, f"✅ 锁定 **{total_valid}** 条有效消息。\n🚀 **[双擎同步 2/2]**\n开始搬运...")
success = 0
failed = 0
failed_ids = []
for task in tasks:
msg_ids_to_copy = album_cache[task['grouped_id']] if task['type'] == 'album' else [task['id']]
if any(m_id in DATA["backup_log"][backup_key] for m_id in msg_ids_to_copy): continue
while True:
try:
if task['type'] == 'album': bot.copy_messages(tgt, src, msg_ids_to_copy)
else: bot.copy_message(tgt, src, msg_ids_to_copy[0])
DATA["backup_log"][backup_key].extend(msg_ids_to_copy)
success += len(msg_ids_to_copy)
time.sleep(3.0 if task['type'] == 'album' else 1.5)
if success % 20 == 0: save_data()
break
except ApiTelegramException as e:
if e.error_code == 429:
time.sleep(e.result_json.get('parameters', {}).get('retry_after', 10))
else:
failed += len(msg_ids_to_copy)
failed_ids.extend(msg_ids_to_copy)
break
except Exception:
failed += len(msg_ids_to_copy)
failed_ids.extend(msg_ids_to_copy)
break
save_data()
report = f"🏁 **备份完成!**\n源: `{src}` ➡️ 目: `{tgt}`\n✅ 新增 **{success}** 条"
if failed > 0:
report += f"\n❌ 失败 **{failed}** 条"
show_ids = failed_ids[:10]
report += f"\n失败消息 ID: `{show_ids}`"
if len(failed_ids) > 10:
report += f"\n... 等共 {len(failed_ids)} 条"
bot.send_message(uid, report)
push_event(uid, "backup_done", f"✅ 备份完成,新增 {success} 条,失败 {failed} 条")
def generate_smart_directory(uid, ch_id):
global TL_LOOP, TL_CLIENT
if not TL_LOOP or not TL_CLIENT: return bot.send_message(uid, "❌ 错误: Userbot 未启动。")
async def scan_and_build():
tags_set = set()
try:
async for msg in TL_CLIENT.iter_messages(int(ch_id)):
if msg.raw_text and '#' in msg.raw_text:
if not msg.entities: clean_text = msg.raw_text
else:
html_text = tl_html.unparse(msg.raw_text, msg.entities)
clean_text = html.unescape(re.sub(r'<.*?>', '', re.sub(r'<blockquote.*?>.*?</blockquote>', '', html_text, flags=re.DOTALL)))
for t in re.findall(r'#[A-Za-z0-9_\u4e00-\u9fa5]+', clean_text): tags_set.add(t)
except Exception as e: return None, str(e)
if not tags_set: return None, "没有找到有效标签。"
directory_map = {}
for tag in tags_set:
clean_str = tag[1:]
if not clean_str: continue
fc = clean_str[0]
key = "#"
if fc.isalpha() and fc.isascii(): key = fc.upper()
elif fc.isdigit(): key = "0-9"
elif '\u4e00' <= fc <= '\u9fff':
try:
py = lazy_pinyin(fc)
if py and len(py[0])>0: key = py[0][0].upper()
except NameError: key = "中文"
if key not in directory_map: directory_map[key] = []
directory_map[key].append(tag)
return directory_map, None
try:
future = asyncio.run_coroutine_threadsafe(scan_and_build(), TL_LOOP)
directory_map, err = future.result(timeout=600)
except Exception as e: return bot.send_message(uid, f"❌ 扫描崩溃: {e}")
if err: return bot.send_message(uid, f"❌ 扫描失败: {err}")
lines = ["目录:\n<blockquote expandable>"]
keys = sorted(directory_map.keys())
if "0-9" in keys: keys.remove("0-9"); keys.insert(0, "0-9")
for key in keys:
tags_line = " ".join([html.escape(t) for t in sorted(directory_map[key])])
lines.append(f"{key}: {tags_line}\n")
lines.append("</blockquote>")
final_text = "\n".join(lines)
if len(final_text) > 4000:
bot.send_message(uid, "⚠️ 目录太长已截断。")
final_text = final_text[:4000] + "\n... </blockquote>"
try:
bot.send_message(uid, final_text, parse_mode="HTML")
bot.send_message(uid, "✅ 手动目录生成完毕!")
push_event(uid, "dir_done", "✅ 手动目录生成完毕")
except Exception as e: bot.send_message(uid, f"❌ 发送失败: {e}")
def batch_replace_tags(uid, ch_id, old_tag, new_tag):
global TL_LOOP, TL_CLIENT
if not TL_LOOP or not TL_CLIENT: return bot.send_message(uid, "❌ 错误: Userbot 未启动。")
async def do_replace():
success = 0
try:
pattern = re.compile(re.escape(old_tag) + r'(?![A-Za-z0-9_\u4e00-\u9fa5])')
async for msg in TL_CLIENT.iter_messages(int(ch_id)):
if msg.raw_text and old_tag in msg.raw_text:
html_text = tl_html.unparse(msg.raw_text, msg.entities)
if pattern.search(html_text):
new_html_text = pattern.sub(new_tag, html_text)
while True:
try:
if msg.photo or msg.video or msg.document: bot.edit_message_caption(caption=new_html_text, chat_id=ch_id, message_id=msg.id, parse_mode="HTML")
else: bot.edit_message_text(text=new_html_text, chat_id=ch_id, message_id=msg.id, parse_mode="HTML")
success += 1
await asyncio.sleep(2)
break
except ApiTelegramException as e:
if e.error_code == 429: await asyncio.sleep(e.result_json.get('parameters', {}).get('retry_after', 10))
else: break
except Exception: break
except Exception as e: return success, str(e)
return success, None
try:
future = asyncio.run_coroutine_threadsafe(do_replace(), TL_LOOP)
success, err = future.result(timeout=3600)
except Exception as e: return bot.send_message(uid, f"❌ 任务崩溃: {e}")
if err: bot.send_message(uid, f"⚠️ 有警告: {err}\n✅ 仍修改了 **{success}** 条消息!")
else: bot.send_message(uid, f"🏁 **修改完成!**\n修改了 **{success}** 条历史消息!")
push_event(uid, "replace_done", f"✅ 标签替换完成,修改 {success} 条")
def process_media_group(mg_id, targets):
with mg_lock:
if mg_id not in media_group_cache: return
msgs = media_group_cache.pop(mg_id)
msgs.sort(key=lambda x: x.message_id)
for tgt in targets:
media_list = []
for m in msgs:
is_spoiler = getattr(m, 'has_media_spoiler', False)
if m.photo: media_list.append(types.InputMediaPhoto(m.photo[-1].file_id, caption=m.caption, caption_entities=m.caption_entities, has_spoiler=is_spoiler))
elif m.video: media_list.append(types.InputMediaVideo(m.video.file_id, caption=m.caption, caption_entities=m.caption_entities, has_spoiler=is_spoiler))
elif m.document: media_list.append(types.InputMediaDocument(m.document.file_id, caption=m.caption, caption_entities=m.caption_entities))
elif m.audio: media_list.append(types.InputMediaAudio(m.audio.file_id, caption=m.caption, caption_entities=m.caption_entities))
if not media_list: continue
try:
sent_msgs = bot.send_media_group(tgt, media_list)
for src_m, sent_m in zip(msgs, sent_msgs):
src_mid = str(src_m.message_id)
if src_mid not in DATA["msg_map"]: DATA["msg_map"][src_mid] = {}
DATA["msg_map"][src_mid][tgt] = sent_m.message_id
except Exception: pass
save_data()
@bot.channel_post_handler(func=lambda m: True, content_types=ALL_TYPES)
def handle_post(message):
src_id = str(message.chat.id)
targets = [g["tgt"] for u in DATA["users"].values() for g in u["groups"] if str(g["src"]) == src_id]
if not targets: return
if message.media_group_id:
mg_id = message.media_group_id
with mg_lock:
if mg_id not in media_group_cache:
media_group_cache[mg_id] = []
Timer(2.0, process_media_group, args=(mg_id, targets)).start()
media_group_cache[mg_id].append(message)
return
src_mid = str(message.message_id)
if src_mid not in DATA["msg_map"]: DATA["msg_map"][src_mid] = {}
for tgt in targets:
try:
copied = bot.copy_message(tgt, message.chat.id, message.message_id)
DATA["msg_map"][src_mid][tgt] = copied.message_id
except Exception: pass
save_data()
@bot.edited_channel_post_handler(func=lambda m: True, content_types=ALL_TYPES)
def handle_edit(message):
src_mid = str(message.message_id)
if src_mid in DATA["msg_map"]:
for tgt, dst_id in DATA["msg_map"][src_mid].items():
try:
if message.content_type == 'text': bot.edit_message_text(message.text, tgt, dst_id, entities=message.entities)
else: bot.edit_message_caption(caption=message.caption, chat_id=tgt, message_id=dst_id, caption_entities=message.caption_entities)
except Exception: pass
# ====== Telethon 后台引擎 ======
def start_telethon_worker():
global TL_LOOP, TL_CLIENT
api_id_str = os.environ.get("API_ID")
api_hash = os.environ.get("API_HASH")
user_session = os.environ.get("USER_SESSION")
if not api_id_str or not user_session:
print("⚠️ 未检测到 USER_SESSION,双擎模块跳过。")
return
TL_LOOP = asyncio.new_event_loop()
asyncio.set_event_loop(TL_LOOP)
TL_CLIENT = TelegramClient(StringSession(user_session), int(api_id_str), api_hash)
async def update_channel_msg():
current_time = int(time.time())
data_changed = False
for uid, u_data in DATA.get("users", {}).items():
tasks = u_data.get("stats_tasks", [])
for i in range(len(tasks) - 1, -1, -1):
task = tasks[i]
ch_id = int(task['channel_id'])
try: msg_id = int(str(task['msg_id']).split('/')[-1])
except ValueError: continue
table_title = task.get('table_title', '📊 **热评榜**')
top_n = int(task.get('top_n', 10))
trigger_tag = task.get('trigger_tag', '#未设置')
completed_items = []
interval_sec = int(task.get('interval', 60)) * 60
if current_time > int(task.get('start_time', current_time)) + int(task.get('duration', 7)) * 86400:
del tasks[i]; data_changed = True; continue
if current_time - int(task.get('last_run', 0)) < interval_sec: continue
try:
original_msg = await TL_CLIENT.get_messages(ch_id, ids=msg_id)
if not original_msg: continue
if original_msg.raw_text:
raw_html = tl_html.unparse(original_msg.raw_text, original_msg.entities)
base_html = raw_html.split("➖➖➖➖➖➖")[0].rstrip() if "➖➖➖➖➖➖" in raw_html else raw_html.rstrip()
else: base_html = ""
comments_data_list = []
discussion_chat_id = None
thread_id = None
async for comment in TL_CLIENT.iter_messages(ch_id, reply_to=msg_id):
if not discussion_chat_id:
discussion_chat_id = comment.chat_id
if comment.reply_to:
thread_id = comment.reply_to.reply_to_top_id or comment.reply_to.reply_to_msg_id
if comment.reactions:
total_reacts = sum(r.count for r in comment.reactions.results)
if total_reacts > 0:
full_raw_text = (comment.raw_text if comment.raw_text else "[图片]").replace('\n', ' ')
short_text = full_raw_text
if len(short_text) > 15: short_text = short_text[:14] + "…"
comments_data_list.append((total_reacts, html.escape(short_text), short_text, full_raw_text, comment.id))
# ====== 屏蔽名单与区域构建 ======
stats_blacklist = task.get('stats_blacklist', [])
blacklist_section = ""
if stats_blacklist:
bl_title = task.get('blacklist_title', '🚫本月轮换限制:')
bl_names = "\n".join([html.escape(n) for n in stats_blacklist])
if bl_title:
blacklist_section = f"\n{html.escape(bl_title)}\n<blockquote>{bl_names}</blockquote>\n"
else:
blacklist_section = f"\n<blockquote>{bl_names}</blockquote>\n"
stats_section = ""
all_comments_for_file = []
if comments_data_list:
comments_data_list.sort(key=lambda x: x[0], reverse=True)
deduped_comments = []
for item in comments_data_list:
total, safe_text, raw_short, full_raw, c_id = item
base_name = re.split(r'[((]', full_raw)[0].strip()
is_blocked = False
for blocked in stats_blacklist:
if blocked and len(blocked) >= 2:
if blocked in full_raw or blocked in base_name:
is_blocked = True
break
if is_blocked:
continue
conflict = False
for added_item in deduped_comments:
added_full_raw = added_item[3]
added_base = re.split(r'[((]', added_full_raw)[0].strip()
if (len(base_name) >= 2 and base_name in added_full_raw) or (len(added_base) >= 2 and added_base in full_raw):
conflict = True; break
if not conflict: deduped_comments.append(item)
comments_data_list = deduped_comments
completed_items = []
try:
async for newer_msg in TL_CLIENT.iter_messages(ch_id, limit=100, min_id=msg_id):
if newer_msg.id == msg_id: continue
if not newer_msg.raw_text or trigger_tag.lower() not in newer_msg.raw_text.lower(): continue
if newer_msg.entities:
html_text = tl_html.unparse(newer_msg.raw_text, newer_msg.entities)
text_no_bq = re.sub(r'<blockquote.*?>.*?</blockquote>', '', html_text, flags=re.DOTALL)
clean_text = html.unescape(re.sub(r'<.*?>', '', text_no_bq))
else:
clean_text = newer_msg.raw_text
if trigger_tag.lower() not in clean_text.lower(): continue
found_tags = re.findall(r'#([A-Za-z0-9_\u4e00-\u9fa5]+)', clean_text)
for item in comments_data_list:
raw_short = item[2]
full_raw = item[3]
base_name = re.split(r'[((]', full_raw)[0].strip()
for tag in found_tags:
if (len(tag) >= 2 and tag.lower() in full_raw.lower()) or (len(base_name) >= 2 and base_name.lower() in tag.lower()):
if raw_short not in completed_items: completed_items.append(raw_short)
break
except Exception: pass
if task.get('completed_items') != completed_items:
task['completed_items'] = completed_items; data_changed = True
comments_data_list.sort(key=lambda x: (x[2] in completed_items, x[0]), reverse=True)
all_comments_for_file = comments_data_list.copy()
comments_data_list = comments_data_list[:top_n]
completed_count = sum(1 for item in comments_data_list if item[2] in completed_items)
max_digits = max([len(str(item[0])) for item in comments_data_list] + [1])
inner_lines = []
for rank, item in enumerate(comments_data_list):
total, safe_text, raw_short, full_raw, c_id = item
medal = "🥇" if rank == 0 else "🥈" if rank == 1 else "🥉" if rank == 2 else "🔸"
display_text = f"<s>{safe_text}</s>" if raw_short in completed_items else safe_text
padded_total = str(total).rjust(max_digits, ' ')
inner_lines.append(f"{medal} <code>{padded_total}</code> 赞 | <i>{display_text}</i>")
beijing_tz = timezone(timedelta(hours=8))
now_str = datetime.now(beijing_tz).strftime("%m-%d %H:%M")
inner_lines.append(f"\n⏳ <code>最后更新: {now_str} (北京时间)</code>")
# ====== 🌟 核心:将榜单存储为网页,并把链接拼接到标题旁边 ======
if all_comments_for_file:
c_chat_str = str(discussion_chat_id).replace("-100", "") if discussion_chat_id else ""
list_html = ""
for rank, item in enumerate(all_comments_for_file, 1):
total, safe_text, raw_short, full_raw, c_id = item
# 结合原生协议与 thread 参数,无缝秒开悬浮评论区!
if c_chat_str and thread_id:
link = f"tg://privatepost?channel={c_chat_str}&post={c_id}&thread={thread_id}"
elif c_chat_str:
link = f"tg://privatepost?channel={c_chat_str}&post={c_id}"
else:
link = "#"
# ✨ 优化1:已完成的名字加删除线和变灰,未完成的正常显示
display_name = html.escape(full_raw)
if raw_short in completed_items:
display_name = f"<s style='opacity: 0.5;'>{display_name}</s>"
# ✨ 优化2:去掉了中间的 status 状态框和最后的 🔗 图标,保持极致干净
list_html += f'<a href="{link}" class="item"><span class="rank">#{rank}</span><span class="name">{display_name}</span><span class="reacts">{total} 赞</span></a>'
html_template = f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>完整榜单 - {html.escape(table_title)}</title>
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: #1a1a2e; color: #eaeaea; padding: 15px; margin: 0; }}
.container {{ max-width: 600px; margin: 0 auto; background: #16213e; border-radius: 12px; padding: 15px; box-shadow: 0 4px 15px rgba(0,0,0,0.3); }}
h2 {{ text-align: center; color: #fff; margin-bottom: 20px; font-size: 18px; border-bottom: 1px solid rgba(255,255,255,0.1); padding-bottom: 10px; line-height: 1.5; }}
.hint {{ text-align: center; color: #5dade2; font-size: 13px; margin-top: -10px; margin-bottom: 15px; font-weight: 500; }}
.item {{ display: flex; justify-content: space-between; align-items: center; padding: 12px 0; border-bottom: 1px solid rgba(255,255,255,0.05); text-decoration: none; color: inherit; transition: background 0.2s; }}
.item:last-child {{ border-bottom: none; }}
.item:active {{ background: rgba(255,255,255,0.05); border-radius: 8px; }}
.rank {{ font-weight: bold; width: 35px; color: #e94560; font-size: 14px; }}
.name {{ flex: 1; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; padding: 0 10px; font-size: 14px; }}
.reacts {{ font-weight: bold; color: #5dade2; font-size: 14px; }}
</style>
</head>
<body>
<div class="container">
<h2>📊 {html.escape(table_title)} <br><span style="font-size:12px;color:#8a8a9a;font-weight:normal">全量数据收录 | 更新于 {now_str}</span></h2>
<div class="hint">💡 点击名字跳转评论投票</div>
{list_html}
</div>
</body>
</html>"""
# 将生成的网页存入缓存字典中,使用 频道ID_消息ID 作为唯一键
cache_key = f"{ch_id}_{msg_id}"
HTML_CACHE[cache_key] = html_template
# 优先使用默认你的自定义域名,如果没有设置,则退回 HF 原生域名
space_host = "bangdan.nine7.cc.cd"
file_msg_link = f"https://{space_host}/list/{cache_key}"
stats_section = f"<b>{html.escape(table_title)} ({completed_count}/{top_n}) <a href='{file_msg_link}'>完整名单</a></b>\n<blockquote>{chr(10).join(inner_lines)}</blockquote>"
else:
beijing_tz = timezone(timedelta(hours=8))
now_str = datetime.now(beijing_tz).strftime("%m-%d %H:%M")
stats_section = f"<b>{html.escape(table_title)} (0/{top_n})</b>\n<blockquote>暂无评论数据\n\n⏳ <code>最后更新: {now_str} (北京时间)</code></blockquote>"
new_message_text = f"{base_html}{SEPARATOR_MARK}{blacklist_section}{stats_section}"
content_hash = f"{blacklist_section}|{stats_section}"
if task.get('last_html_stats') != content_hash:
try:
if original_msg.photo or original_msg.video or original_msg.document: bot.edit_message_caption(caption=new_message_text, chat_id=ch_id, message_id=msg_id, parse_mode="HTML")
else: bot.edit_message_text(text=new_message_text, chat_id=ch_id, message_id=msg_id, parse_mode="HTML")
task['last_html_stats'] = content_hash; data_changed = True
except Exception: pass
task['last_run'] = current_time; data_changed = True; await asyncio.sleep(2)
except Exception as e:
pass
if data_changed: await asyncio.to_thread(save_data)
async def update_channel_dirs():
current_time = int(time.time())
data_changed = False
for uid, u_data in DATA.get("users", {}).items():
tasks = u_data.get("dir_tasks", [])
for task in tasks:
ch_id = int(task['channel_id'])
msg_id = int(task['msg_id'])
blacklist = task.get('blacklist', [])
tags_map = task.get('tags_map', {})
scanned_msgs = task.get('scanned_msgs', {})
interval_sec = int(task.get('interval', 15)) * 60
if current_time - int(task.get('last_run', 0)) < interval_sec: continue
is_first_run = not bool(scanned_msgs)
scan_kwargs = {'limit': None if is_first_run else 150}
new_tags_found = False
try:
original_msg = await TL_CLIENT.get_messages(ch_id, ids=msg_id)
if not original_msg: continue
if original_msg.raw_text:
raw_html = tl_html.unparse(original_msg.raw_text, original_msg.entities)
base_html = raw_html.split("➖➖➖➖➖➖")[0].rstrip() if "➖➖➖➖➖➖" in raw_html else raw_html.rstrip()
else: base_html = ""
except Exception: continue
try:
async for msg in TL_CLIENT.iter_messages(ch_id, **scan_kwargs):
if msg.id == msg_id: continue
msg_id_str = str(msg.id)
msg_time = msg.edit_date.timestamp() if msg.edit_date else msg.date.timestamp()
if scanned_msgs.get(msg_id_str) == msg_time: continue
scanned_msgs[msg_id_str] = msg_time
data_changed = True
found_tags = []
if msg.raw_text and '#' in msg.raw_text:
if not msg.entities: clean_text = msg.raw_text
else:
html_text = tl_html.unparse(msg.raw_text, msg.entities)
text_without_bq = re.sub(r'<blockquote.*?>.*?</blockquote>', '', html_text, flags=re.DOTALL)
clean_text = html.unescape(re.sub(r'<.*?>', '', text_without_bq))
found_tags = re.findall(r'#[A-Za-z0-9_\u4e00-\u9fa5]+', clean_text)
if found_tags:
tags_map[msg_id_str] = found_tags
new_tags_found = True
else:
if msg_id_str in tags_map:
del tags_map[msg_id_str]
new_tags_found = True
task['last_run'] = current_time
if new_tags_found or is_first_run:
task['tags_map'] = tags_map
task['scanned_msgs'] = scanned_msgs
data_changed = True
all_tags = set()
for t_list in tags_map.values(): all_tags.update(t_list)
active_tags = [t for t in all_tags if t not in blacklist]
task['tags_cache'] = active_tags
directory_map = {}
for tag in active_tags:
clean_str = tag[1:]
if not clean_str: continue
fc = clean_str[0]
key = "#"
if fc.isalpha() and fc.isascii(): key = fc.upper()
elif fc.isdigit(): key = "0-9"
elif '\u4e00' <= fc <= '\u9fff':
try:
py = lazy_pinyin(fc)
if py and len(py[0])>0: key = py[0][0].upper()
except NameError: key = "中文"
if key not in directory_map: directory_map[key] = []
directory_map[key].append(tag)
lines = ["目录:\n<blockquote expandable>"]
keys = sorted(directory_map.keys())
if "0-9" in keys: keys.remove("0-9"); keys.insert(0, "0-9")
for key in keys:
tags_line = " ".join([html.escape(t) for t in sorted(directory_map[key])])
lines.append(f"{key}: {tags_line}\n")
lines.append("</blockquote>")
beijing_tz = timezone(timedelta(hours=8))
now_str = datetime.now(beijing_tz).strftime("%m-%d %H:%M")
lines.append(f"\n⏳ <code>最后更新: {now_str} (北京时间)</code>")
stats_text = f"<blockquote>{chr(10).join(lines)}</blockquote>"
task_name = task.get('task_name', '标签目录')
safe_title_with_count = f"{html.escape(task_name)} ({len(active_tags)})"
new_message_text = f"{base_html}{SEPARATOR_MARK}<b>{safe_title_with_count}</b>\n{stats_text}"
if len(new_message_text) > 4000: new_message_text = new_message_text[:4000] + "\n... </blockquote>\n⚠️ 目录过长已截断"
if task.get('last_html_dir') != stats_text:
try:
if original_msg.photo or original_msg.video or original_msg.document: bot.edit_message_caption(caption=new_message_text, chat_id=ch_id, message_id=msg_id, parse_mode="HTML")
else: bot.edit_message_text(text=new_message_text, chat_id=ch_id, message_id=msg_id, parse_mode="HTML")
task['last_html_dir'] = stats_text
data_changed = True
except Exception: pass
except Exception: pass
if data_changed: await asyncio.to_thread(save_data)
TL_CLIENT.start()
scheduler = AsyncIOScheduler(event_loop=TL_LOOP)
scheduler.add_job(update_channel_msg, 'interval', seconds=10)
scheduler.add_job(update_channel_dirs, 'interval', seconds=15)
scheduler.start()
print("🚀 Telethon [全能排版+智能双擎后台] 已启动!")
TL_LOOP.run_until_complete(TL_CLIENT.run_until_disconnected())
# ====== Flask + WebApp API ======
app = Flask(__name__)
@app.after_request
def add_security_headers(response):
response.headers['X-Frame-Options'] = 'ALLOW-FROM https://web.telegram.org'
response.headers['Content-Security-Policy'] = "frame-ancestors 'self' https://web.telegram.org https://*.telegram.org;"
return response
def validate_webapp(req):
init_data = req.headers.get('X-Init-Data', '')
if not init_data:
return None
try:
parsed = {}
for part in init_data.split('&'):
if '=' in part:
k, v = part.split('=', 1)
parsed[k] = unquote(v)
check_hash = parsed.pop('hash', None)
if not check_hash or not BOT_TOKEN:
return None
data_check_string = "\n".join(f"{k}={parsed[k]}" for k in sorted(parsed.keys()))
secret_key = hmac.new(b"WebAppData", BOT_TOKEN.encode(), hashlib.sha256).digest()
computed = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest()
if not hmac.compare_digest(computed, check_hash):
print("⛔ HMAC 签名不匹配")
return None
auth_date = int(parsed.get('auth_date', 0))
if abs(time.time() - auth_date) > 86400:
print("⛔ initData 已过期")
return None
user_obj = json.loads(parsed.get('user', '{}'))
uid = str(user_obj.get('id', ''))
if uid and uid not in DATA["users"]:
DATA["users"][uid] = {"groups": [], "stats_tasks": [], "dir_tasks": [], "address_book": {}}
save_data()
return uid if uid else None
except Exception as e:
print(f"❌ WebApp 鉴权错误: {e}")
return None
def need_auth(f):
@wraps(f)
def wrapper(*args, **kwargs):
uid = validate_webapp(request)
if not uid:
return jsonify({"ok": False, "msg": "未授权"}), 401
if check_rate_limit(uid):
return jsonify({"ok": False, "msg": "操作太频繁,请稍后再试"}), 429
return f(uid, *args, **kwargs)
return wrapper
@app.route('/')
def home():
return "Bot is running"
@app.route('/webapp')
def webapp_page():
return send_file('webapp.html')
# 👇 新增路由:专门用于分发生成的完整榜单外部网页
@app.route('/list/<cache_key>')
def view_list(cache_key):
html_content = HTML_CACHE.get(cache_key)
if not html_content:
return "暂无数据或页面已刷新,请等待机器人下次更新", 404
return html_content
@app.route('/api/data')
@need_auth
def api_get_data(uid):
user = DATA["users"].get(uid, {})
return jsonify({
"ok": True, "user": user,
"msg_count": len(DATA.get("msg_map", {})),
"webdav_url": FULL_WEBDAV_URL,
"userbot": TL_CLIENT is not None and TL_LOOP is not None
})
@app.route('/api/groups', methods=['POST'])
@need_auth
def api_add_group(uid):
d = request.json
DATA["users"][uid].setdefault("groups", []).append({"src": d["src"], "tgt": d["tgt"]})
save_data()
return jsonify({"ok": True, "user": DATA["users"][uid]})
@app.route('/api/groups/<int:idx>', methods=['DELETE'])
@need_auth
def api_del_group(uid, idx):
try:
del DATA["users"][uid]["groups"][idx]
save_data()
return jsonify({"ok": True, "user": DATA["users"][uid]})
except: return jsonify({"ok": False, "msg": "索引无效"})
@app.route('/api/channels', methods=['POST'])
@need_auth
def api_add_channel(uid):
d = request.json
DATA["users"][uid].setdefault("address_book", {})[d["id"]] = d["name"]
save_data()
return jsonify({"ok": True, "user": DATA["users"][uid]})
@app.route('/api/channels/<path:cid>', methods=['DELETE'])
@need_auth
def api_del_channel(uid, cid):
ab = DATA["users"][uid].get("address_book", {})
if cid in ab:
del ab[cid]
save_data()
return jsonify({"ok": True, "user": DATA["users"][uid]})
@app.route('/api/stats', methods=['POST'])
@need_auth
def api_add_stat(uid):
d = request.json
msg_id_raw = str(d.get("msg_id", ""))
if msg_id_raw.startswith('http'): msg_id_raw = msg_id_raw.split('/')[-1]
DATA["users"][uid].setdefault("stats_tasks", []).append({
"task_name": d["task_name"], "channel_id": d["channel_id"], "msg_id": msg_id_raw,
"table_title": d.get("table_title", "📊 统计"), "top_n": int(d.get("top_n", 10)),
"trigger_tag": d.get("trigger_tag", "#更新"), "interval": int(d.get("interval", 15)),
"duration": int(d.get("duration", 7)), "start_time": int(time.time()),
"last_run": 0, "completed_items": [],
"last_checked_msg_id": int(msg_id_raw) if msg_id_raw.isdigit() else 0,
"stats_blacklist": [x.strip() for x in re.split(r'[\s\n]+', d.get("stats_blacklist", "")) if x.strip()],
"blacklist_title": d.get("blacklist_title", "🚫本月轮换限制:")
})
save_data()
return jsonify({"ok": True, "user": DATA["users"][uid]})
@app.route('/api/stats/<int:idx>', methods=['PUT'])
@need_auth
def api_edit_stat(uid, idx):
d = request.json
field, val = d["field"], d["value"]
try:
task = DATA["users"][uid]["stats_tasks"][idx]
if field in ("top_n", "interval", "duration"):
task[field] = int(val)
elif field == "msg_id":
if val.startswith('http'): val = val.split('/')[-1]
task["msg_id"] = val
task['last_html_stats'] = ""
task['last_checked_msg_id'] = int(val) if val.isdigit() else 0
task['completed_items'] = []
if "file_msg_id" in task: del task["file_msg_id"]
elif field == "add_stats_bl":
task.setdefault("stats_blacklist", [])
new_items = [x.strip() for x in re.split(r'[\s\n]+', val) if x.strip()]
task["stats_blacklist"].extend(new_items)
task["stats_blacklist"] = list(set(task["stats_blacklist"]))
task["last_html_stats"] = ""
elif field == "rm_stats_bl":
to_remove = [x.strip() for x in re.split(r'[\s\n]+', val) if x.strip()]
task["stats_blacklist"] = [x for x in task.get("stats_blacklist", []) if x not in to_remove]
task["last_html_stats"] = ""
elif field == "blacklist_title":
task["blacklist_title"] = "" if val.strip() == "无" else val.strip()
task["last_html_stats"] = ""
else:
task[field] = val
task["last_html_stats"] = ""
task["last_run"] = 0
save_data()
return jsonify({"ok": True, "user": DATA["users"][uid]})
except Exception as e:
return jsonify({"ok": False, "msg": str(e)})
@app.route('/api/stats/<int:idx>', methods=['DELETE'])
@need_auth
def api_del_stat(uid, idx):
try:
del DATA["users"][uid]["stats_tasks"][idx]
save_data()
return jsonify({"ok": True, "user": DATA["users"][uid]})
except: return jsonify({"ok": False, "msg": "索引无效"})
@app.route('/api/dirs', methods=['POST'])
@need_auth
def api_add_dir(uid):
d = request.json
msg_id_raw = str(d.get("msg_id", ""))
if msg_id_raw.startswith('http'): msg_id_raw = msg_id_raw.split('/')[-1]
DATA["users"][uid].setdefault("dir_tasks", []).append({
"task_name": d["task_name"], "channel_id": d["channel_id"], "msg_id": msg_id_raw,
"blacklist": d.get("blacklist", []), "interval": 15,
"tags_cache": [], "tags_map": {}, "scanned_msgs": {}, "last_html_dir": ""
})
save_data()
return jsonify({"ok": True, "user": DATA["users"][uid]})
@app.route('/api/dirs/<int:idx>', methods=['PUT'])
@need_auth
def api_edit_dir(uid, idx):
d = request.json
field, val = d["field"], d["value"]
try:
task = DATA["users"][uid]["dir_tasks"][idx]
if field == "add_blacklist":
task["blacklist"].extend(val.split())
task["blacklist"] = list(set(task["blacklist"]))
elif field == "rm_blacklist":
to_rem = val.split()
task["blacklist"] = [t for t in task["blacklist"] if t not in to_rem]
elif field == "interval":
task["interval"] = int(val)
save_data()
return jsonify({"ok": True, "user": DATA["users"][uid]})
except Exception as e:
return jsonify({"ok": False, "msg": str(e)})
@app.route('/api/dirs/<int:idx>', methods=['DELETE'])
@need_auth
def api_del_dir(uid, idx):
try:
del DATA["users"][uid]["dir_tasks"][idx]
save_data()
return jsonify({"ok": True, "user": DATA["users"][uid]})
except: return jsonify({"ok": False, "msg": "索引无效"})
@app.route('/api/btn_new', methods=['POST'])
@need_auth
def api_btn_new(uid):
d = request.json
markup = types.InlineKeyboardMarkup()
markup.add(types.InlineKeyboardButton(text=d["btn_text"], url=d["url"]))
try:
bot.send_message(d["ch_id"], d["text"], reply_markup=markup, parse_mode="HTML")
return jsonify({"ok": True})
except Exception as e:
try:
bot.send_message(d["ch_id"], d["text"], reply_markup=markup)
return jsonify({"ok": True})
except Exception as e2:
return jsonify({"ok": False, "msg": str(e2)})
@app.route('/api/btn_old', methods=['POST'])
@need_auth
def api_btn_old(uid):
d = request.json
msg_id = d["msg_id"]
if msg_id.startswith('http'): msg_id = msg_id.split('/')[-1]
try:
if d["btn_text"].strip() == "删除":
bot.edit_message_reply_markup(chat_id=d["ch_id"], message_id=int(msg_id), reply_markup=None)
else:
markup = types.InlineKeyboardMarkup()
markup.add(types.InlineKeyboardButton(text=d["btn_text"], url=d["url"]))
bot.edit_message_reply_markup(chat_id=d["ch_id"], message_id=int(msg_id), reply_markup=markup)
return jsonify({"ok": True})
except Exception as e:
return jsonify({"ok": False, "msg": str(e)})
@app.route('/api/btn_multi', methods=['POST'])
@need_auth
def api_btn_multi(uid):
d = request.json
markup = types.InlineKeyboardMarkup(row_width=1)
for b in d["buttons"]:
markup.add(types.InlineKeyboardButton(text=b["text"], url=b["url"]))
try:
bot.send_message(d["ch_id"], d["text"], reply_markup=markup, parse_mode="HTML")
return jsonify({"ok": True})
except Exception as e:
try:
bot.send_message(d["ch_id"], d["text"], reply_markup=markup)
return jsonify({"ok": True})
except Exception as e2:
return jsonify({"ok": False, "msg": str(e2)})
@app.route('/api/gen_dir', methods=['POST'])
@need_auth
def api_gen_dir(uid):
d = request.json
Thread(target=generate_smart_directory, args=(uid, d["ch_id"])).start()
return jsonify({"ok": True})
@app.route('/api/replace_tag', methods=['POST'])
@need_auth
def api_replace_tag(uid):
d = request.json
new_tag = "" if d.get("new_tag") == "删除" else d.get("new_tag", "")
Thread(target=batch_replace_tags, args=(uid, d["ch_id"], d["old_tag"], new_tag)).start()
return jsonify({"ok": True})
@app.route('/api/backup', methods=['POST'])
@need_auth
def api_backup(uid):
d = request.json
try:
latest_id = int(d["link"].strip().split('/')[-1])
Thread(target=run_smart_backup_v2, args=(latest_id, uid, d["src"], d["tgt"])).start()
return jsonify({"ok": True})
except Exception as e:
return jsonify({"ok": False, "msg": str(e)})
# ====== SSE 实时事件流 ======
@app.route('/api/events')
def api_events():
class FakeReq:
def __init__(self, init_data_str):
self.headers = {'X-Init-Data': init_data_str}
init_data_str = request.args.get('init_data', '')
uid = validate_webapp(FakeReq(init_data_str))
if not uid:
return jsonify({"ok": False, "msg": "未授权"}), 401
def stream():
q = _event_queues[uid]
yield f"data: {json.dumps({'type': 'connected', 'data': '🟢 实时连接已建立'})}\n\n"
while True:
try:
event = q.get(timeout=30)
yield f"data: {json.dumps(event)}\n\n"
except queue.Empty:
yield f": heartbeat\n\n"
return Response(
stream(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
'Connection': 'keep-alive'
}
)
@app.route('/api/health')
def api_health():
return jsonify({
"status": "ok",
"users": len(DATA.get("users", {})),
"msg_map": len(DATA.get("msg_map", {})),
"telethon": TL_CLIENT is not None
})
# ===== 5. 启动点 =====
if __name__ == "__main__":
Thread(target=lambda: app.run(host="0.0.0.0", port=7860), daemon=True).start()
Thread(target=start_telethon_worker, daemon=True).start()
print("🔄 正在清除旧连接...")
for attempt in range(5):
try:
bot.remove_webhook()
bot.get_updates(offset=-1, timeout=1)
break
except Exception as e:
print(f"⏳ 等待旧实例释放... ({attempt+1}/5) {e}")
time.sleep(3)
print("🤖 Telebot 主消息引擎已启动!")
print("🌐 Mini App 地址: http://localhost:7860/webapp")
while True:
try:
bot.infinity_polling(
timeout=60,
long_polling_timeout=60,
allowed_updates=["message", "callback_query",
"channel_post", "edited_channel_post"]
)
except Exception as e:
print(f"❌ Polling 异常: {e}")
print("⏳ 10秒后重连...")
time.sleep(10)
try:
bot.remove_webhook()
bot.get_updates(offset=-1, timeout=1)
except:
pass