| import socket |
| import time |
| import os |
| import json |
| import asyncio |
| import html |
| import re |
| import hmac |
| import hashlib |
| import queue |
| from collections import defaultdict |
| from datetime import datetime, timezone, timedelta |
| from threading import Thread, Lock, Timer |
| from urllib.parse import parse_qs, unquote |
| from functools import wraps |
|
|
| import urllib3.util.connection as urllib3_cn |
|
|
| |
| def allowed_gai_family(): |
| return socket.AF_INET |
| urllib3_cn.allowed_gai_family = allowed_gai_family |
|
|
| import telebot |
| from telebot import types, apihelper |
| from telebot.apihelper import ApiTelegramException |
| from flask import Flask, request, jsonify, send_file, Response |
| from webdav4.client import Client as WebDAVClient |
| from telethon import TelegramClient |
| from telethon.sessions import StringSession |
| from telethon.extensions import html as tl_html |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler |
|
|
| try: |
| from pypinyin import lazy_pinyin |
| except ImportError: |
| print("⚠️ 未安装 pypinyin") |
|
|
| |
| DAV_URL_BASE = os.environ.get("WEBDAV_URL", "").rstrip("/") |
| DAV_USER = os.environ.get("WEBDAV_USER") or os.environ.get("WEBDAV_USERNAME") |
| DAV_PASS = os.environ.get("WEBDAV_PASS") or os.environ.get("WEBDAV_PASSWORD") |
| DAV_SUB_PATH = os.environ.get("WEBDAV_PATH", "").strip("/") |
| FULL_WEBDAV_URL = f"{DAV_URL_BASE}/{DAV_SUB_PATH}/" if DAV_SUB_PATH else f"{DAV_URL_BASE}/" |
| REMOTE_FILENAME = "tg_bot_data_v5.json" |
|
|
| DATA = {"users": {}, "msg_map": {}, "backup_log": {}} |
| data_lock = Lock() |
|
|
| |
| HTML_CACHE = {} |
|
|
| TL_LOOP = None |
| TL_CLIENT = None |
| SEPARATOR_MARK = "\n\n➖➖➖➖➖➖\n" |
|
|
| def get_dav_client(): |
| return WebDAVClient(base_url=FULL_WEBDAV_URL, auth=(DAV_USER, DAV_PASS)) |
|
|
| def load_data(): |
| global DATA |
| try: |
| print(f"📡 正在尝试连接 WebDAV...") |
| client = get_dav_client() |
| if client.exists(REMOTE_FILENAME): |
| client.download_file(REMOTE_FILENAME, "temp_data.json") |
| with open("temp_data.json", "r") as f: |
| loaded_data = json.load(f) |
| DATA["users"] = loaded_data.get("users", {}) |
| DATA["msg_map"] = loaded_data.get("msg_map", {}) |
| DATA["backup_log"] = loaded_data.get("backup_log", {}) |
| print(f"✅ 成功下载恢复云端数据!映射条数: {len(DATA['msg_map'])}") |
| else: |
| save_data(force=True) |
| except Exception as e: print(f"❌ WebDAV 初始化出错: {e}") |
|
|
| def save_data(force=True): |
| with data_lock: |
| try: |
| with open("temp_data.json", "w") as f: json.dump(DATA, f) |
| get_dav_client().upload_file("temp_data.json", REMOTE_FILENAME, overwrite=True) |
| print(f"☁️ [WebDAV] 数据已同步: {time.strftime('%H:%M:%S')}") |
| except Exception as e: print(f"❌ [WebDAV] 上传失败: {e}") |
|
|
| load_data() |
|
|
| |
| BOT_TOKEN = os.environ.get("BOT_TOKEN") |
| bot = telebot.TeleBot(BOT_TOKEN) |
| apihelper.API_URL = "https://nine7.linlizhi0210.workers.dev/bot{0}/{1}" |
|
|
| user_states = {} |
| ALL_TYPES = ['text', 'audio', 'document', 'photo', 'sticker', 'video', 'video_note', 'voice', 'location', 'contact', 'animation', 'dice', 'poll'] |
|
|
| |
| _rate_limit = defaultdict(list) |
| RATE_LIMIT_MAX = 30 |
| RATE_LIMIT_WINDOW = 60 |
|
|
| def check_rate_limit(uid): |
| now = time.time() |
| _rate_limit[uid] = [t for t in _rate_limit[uid] if now - t < RATE_LIMIT_WINDOW] |
| if len(_rate_limit[uid]) >= RATE_LIMIT_MAX: |
| return True |
| _rate_limit[uid].append(now) |
| return False |
|
|
| |
| _event_queues = defaultdict(lambda: queue.Queue(maxsize=50)) |
|
|
| def push_event(uid, event_type, data): |
| q = _event_queues.get(uid) |
| if q: |
| try: |
| q.put_nowait({ |
| "type": event_type, |
| "data": data, |
| "time": time.strftime("%H:%M:%S") |
| }) |
| except queue.Full: |
| pass |
|
|
| try: |
| bot.set_my_commands([ |
| types.BotCommand("start", "🤖 初始化控制台"), |
| types.BotCommand("channels", "📔 频道地址簿"), |
| types.BotCommand("btn_new", "🆕 发送带按钮消息"), |
| types.BotCommand("btn_old", "🔘 给旧消息加按钮"), |
| types.BotCommand("add_stat", "📈 创建统计任务"), |
| types.BotCommand("list_stat", "📊 管理统计任务"), |
| types.BotCommand("add_dir", "🗂️ 创建自动目录"), |
| types.BotCommand("list_dir", "📂 管理目录任务"), |
| types.BotCommand("dir", "🗂️ 生成手动目录"), |
| types.BotCommand("replace_tag", "🔄 批量替换标签"), |
| types.BotCommand("backup", "🚀 智能全量备份"), |
| types.BotCommand("add", "➕ 添加同步组"), |
| types.BotCommand("list", "📋 我的同步组"), |
| types.BotCommand("status", "⚙️ 存储状态") |
| ]) |
| except Exception: pass |
|
|
| media_group_cache = {} |
| mg_lock = Lock() |
|
|
| def send_channel_prompt(uid, text): |
| address_book = DATA["users"].get(uid, {}).get("address_book", {}) |
| markup = None |
| if address_book: |
| markup = types.InlineKeyboardMarkup(row_width=2) |
| btns = [types.InlineKeyboardButton(f"📔 {name}", callback_data=f"selch_{cid}") for cid, name in address_book.items()] |
| markup.add(*btns) |
| bot.send_message(uid, text, reply_markup=markup, parse_mode="Markdown") |
|
|
| @bot.message_handler(commands=['start']) |
| def cmd_start(message): |
| uid = str(message.from_user.id) |
| if uid not in DATA["users"]: |
| DATA["users"][uid] = {"groups": [], "stats_tasks": [], "dir_tasks": [], "address_book": {}} |
| save_data() |
| bot.reply_to(message, f"🤖 控制台已就绪\n云端路径:`{FULL_WEBDAV_URL}`\n👇 请点击左下角 **Menu** 展开操作菜单。", parse_mode="Markdown") |
|
|
| @bot.message_handler(commands=['channels']) |
| def cmd_channels(message): |
| uid = str(message.from_user.id) |
| address_book = DATA["users"].get(uid, {}).get("address_book", {}) |
| msg_text = "📔 **我的频道地址簿**\n\n" |
| if not address_book: msg_text += "暂无保存的频道。请点击下方添加。" |
| else: |
| for cid, name in address_book.items(): msg_text += f"▪️ **{name}**: `{cid}`\n" |
| markup = types.InlineKeyboardMarkup() |
| markup.add(types.InlineKeyboardButton("➕ 添加新频道", callback_data="ab_add")) |
| if address_book: markup.add(types.InlineKeyboardButton("🗑️ 删除频道", callback_data="ab_del_menu")) |
| bot.send_message(uid, msg_text, reply_markup=markup, parse_mode="Markdown") |
|
|
| @bot.message_handler(commands=['btn_new']) |
| def cmd_btn_new(message): |
| uid = str(message.from_user.id) |
| user_states[uid] = {"step": "WAIT_BTN_NEW_CH"} |
| send_channel_prompt(uid, "🆕 **发送带按钮的新消息**\n\n1️⃣ 请输入目标【频道 ID】(或点击下方地址簿选择):") |
|
|
| @bot.message_handler(commands=['btn_old']) |
| def cmd_btn_old(message): |
| uid = str(message.from_user.id) |
| user_states[uid] = {"step": "WAIT_BTN_OLD_CH"} |
| send_channel_prompt(uid, "🔘 **给旧消息加按钮 (或修改/删除)**\n\n1️⃣ 请输入目标【频道 ID】(或点击下方地址簿选择):") |
|
|
| @bot.message_handler(commands=['add', 'list', 'status', 'backup']) |
| def handle_basic_commands(message): |
| uid = str(message.from_user.id) |
| cmd = message.text.split()[0] |
| if cmd == '/add': |
| user_states[uid] = {"step": "WAIT_SRC"} |
| send_channel_prompt(uid, "请输入源频道 ID:") |
| elif cmd == '/list': |
| groups = DATA["users"].get(uid, {}).get("groups", []) |
| if not groups: return bot.send_message(uid, "暂无同步任务。") |
| for i, g in enumerate(groups): bot.send_message(uid, f"📦 组 {i+1}\n源: `{g['src']}`\n目: `{g['tgt']}`", reply_markup=types.InlineKeyboardMarkup().add(types.InlineKeyboardButton("🗑 删除", callback_data=f"del_{i}"))) |
| elif cmd == '/status': bot.send_message(uid, f"⚙️ 状态\n映射条数: {len(DATA['msg_map'])}") |
| elif cmd == '/backup': |
| groups = DATA["users"].get(uid, {}).get("groups", []) |
| markup = types.InlineKeyboardMarkup(row_width=1) |
| for i, g in enumerate(groups): markup.add(types.InlineKeyboardButton(f"📦 同步组 {i+1} 备份", callback_data=f"bkp_g_{i}")) |
| markup.add(types.InlineKeyboardButton("✏️ 手动指定通道", callback_data="bkp_m")) |
| bot.send_message(uid, "🚀 **请选择智能备份通道:**", reply_markup=markup, parse_mode="Markdown") |
|
|
| @bot.message_handler(commands=['add_stat']) |
| def cmd_add_stat(message): |
| uid = str(message.from_user.id) |
| user_states[uid] = {"step": "WAIT_STAT_NAME"} |
| bot.send_message(uid, "📈 **创建自定义统计任务**\n\n1️⃣ 请给这个任务起个名称 (如: `日常早报统计`):", parse_mode="Markdown") |
|
|
| @bot.message_handler(commands=['list_stat']) |
| def cmd_list_stat(message): |
| uid = str(message.from_user.id) |
| stats = DATA["users"].get(uid, {}).get("stats_tasks", []) |
| if not stats: return bot.send_message(uid, "暂无统计任务。") |
| for i, t in enumerate(stats): |
| msg = (f"🎯 **任务**: `{t.get('task_name', '未命名')}`\n📌 **频道**: `{t.get('channel_id')}` | **消息**: `{t.get('msg_id')}`\n" |
| f"📊 **表头**: `{t.get('table_title', '未设置标题')}`\n🔑 **触发标签**: `{t.get('trigger_tag', '#未设置')}`\n" |
| f"🏆 **上榜名额**: 前 `{t.get('top_n', 10)}` 名\n⏱ **更新**: 每 `{t.get('interval', 60)}` 分钟\n⏳ **剩余**: `{t.get('duration', 7)}` 天\n" |
| f"🚫 **屏蔽名单**: `{', '.join(t.get('stats_blacklist', [])) or '无'}`\n" |
| f"📝 **屏蔽区标题**: `{t.get('blacklist_title', '🚫本月轮换限制:') or '无'}`") |
| markup = types.InlineKeyboardMarkup(row_width=2) |
| markup.add(types.InlineKeyboardButton("✏️ 频道 ID", callback_data=f"e_chid_{i}"), types.InlineKeyboardButton("✏️ 消息 ID", callback_data=f"e_msgid_{i}")) |
| markup.add(types.InlineKeyboardButton("✏️ 任务名", callback_data=f"e_name_{i}"), types.InlineKeyboardButton("✏️ 表头标题", callback_data=f"e_titl_{i}")) |
| markup.add(types.InlineKeyboardButton("✏️ 触发标签", callback_data=f"e_trig_{i}"), types.InlineKeyboardButton("🏆 上榜名额", callback_data=f"e_topn_{i}")) |
| markup.add(types.InlineKeyboardButton("⏱ 更新频率", callback_data=f"e_intv_{i}"), types.InlineKeyboardButton("⏳ 存活期限", callback_data=f"e_dura_{i}")) |
| markup.add(types.InlineKeyboardButton("🚫 加屏蔽", callback_data=f"e_sabl_{i}"), types.InlineKeyboardButton("✅ 删屏蔽", callback_data=f"e_srbl_{i}")) |
| markup.add(types.InlineKeyboardButton("📝 屏蔽区标题", callback_data=f"e_sblt_{i}")) |
| markup.add(types.InlineKeyboardButton("🗑️ 删除该任务", callback_data=f"d_s_{i}")) |
| bot.send_message(uid, msg, reply_markup=markup, parse_mode="Markdown") |
|
|
| @bot.message_handler(commands=['dir']) |
| def cmd_build_dir(message): |
| uid = str(message.from_user.id) |
| user_states[uid] = {"step": "WAIT_MANUAL_DIR_CH"} |
| send_channel_prompt(uid, "🗂️ **生成频道标签目录**\n\n请输入需要扫描的【频道 ID】(例如 `-10012345678`):") |
|
|
| @bot.message_handler(commands=['add_dir']) |
| def cmd_add_dir(message): |
| uid = str(message.from_user.id) |
| user_states[uid] = {"step": "WAIT_DIR_NAME"} |
| bot.send_message(uid, "🗂️ **创建自动更新目录任务**\n\n1️⃣ 请给任务起个名字 (如: `主频道自动目录`):", parse_mode="Markdown") |
|
|
| @bot.message_handler(commands=['list_dir']) |
| def cmd_list_dir(message): |
| uid = str(message.from_user.id) |
| dirs = DATA["users"].get(uid, {}).get("dir_tasks", []) |
| if not dirs: return bot.send_message(uid, "暂无自动目录任务。") |
| for i, t in enumerate(dirs): |
| msg = (f"🗂️ **任务**: `{t.get('task_name', '未命名')}`\n" |
| f"📌 **频道**: `{t.get('channel_id')}` | **承载消息**: `{t.get('msg_id')}`\n" |
| f"⏱ **频率**: 每 `{t.get('interval', 15)}` 分钟扫描一次\n" |
| f"🛡️ **屏蔽标签**: `{', '.join(t.get('blacklist', [])) or '无'}`\n" |
| f"📦 **已收录标签**: `{len(t.get('tags_cache', []))} 个`") |
| markup = types.InlineKeyboardMarkup(row_width=2) |
| markup.add(types.InlineKeyboardButton("➕ 加屏蔽", callback_data=f"ed_ab_{i}"), types.InlineKeyboardButton("➖ 删屏蔽", callback_data=f"ed_rb_{i}")) |
| markup.add(types.InlineKeyboardButton("⏱ 扫描频率", callback_data=f"ed_in_{i}")) |
| markup.add(types.InlineKeyboardButton("🗑️ 终止并删除该目录任务", callback_data=f"d_d_{i}")) |
| bot.send_message(uid, msg, reply_markup=markup, parse_mode="Markdown") |
|
|
| @bot.message_handler(commands=['replace_tag']) |
| def cmd_replace_tag(message): |
| uid = str(message.from_user.id) |
| user_states[uid] = {"step": "WAIT_REP_CH"} |
| send_channel_prompt(uid, "🔄 **批量替换/删除标签**\n\n1️⃣ 请输入需要处理的【频道 ID】:") |
|
|
| def process_user_text(uid, text): |
| if uid not in DATA["users"]: |
| DATA["users"][uid] = {"groups": [], "stats_tasks": [], "dir_tasks": [], "address_book": {}} |
| save_data() |
| if uid in user_states: |
| state = user_states[uid] |
| step = state["step"] |
|
|
| if step == "WAIT_BTN_NEW_CH": |
| user_states[uid].update({"step": "WAIT_BTN_NEW_TEXT", "ch_id": text}) |
| bot.send_message(uid, "2️⃣ 请输入要发送的**消息正文** (支持 HTML/Markdown,不需要排版可直接打字):") |
| elif step == "WAIT_BTN_NEW_TEXT": |
| user_states[uid].update({"step": "WAIT_BTN_NEW_BTEXT", "msg_text": text}) |
| bot.send_message(uid, "3️⃣ 请输入**按钮上显示的文字** (例如: `🔗 点击进入主群`):") |
| elif step == "WAIT_BTN_NEW_BTEXT": |
| user_states[uid].update({"step": "WAIT_BTN_NEW_URL", "btn_text": text}) |
| bot.send_message(uid, "4️⃣ 请输入**按钮跳转的链接 URL** (必须以 http/https 开头):") |
| elif step == "WAIT_BTN_NEW_URL": |
| if not text.startswith("http"): |
| return bot.send_message(uid, "❌ 链接格式错误!必须以 http:// 或 https:// 开头。请重新输入:") |
| markup = types.InlineKeyboardMarkup() |
| markup.add(types.InlineKeyboardButton(text=state["btn_text"], url=text)) |
| try: |
| bot.send_message(state["ch_id"], state["msg_text"], reply_markup=markup, parse_mode="HTML") |
| bot.send_message(uid, f"✅ 成功!消息已发送至频道 `{state['ch_id']}`") |
| except Exception as e: |
| try: |
| bot.send_message(state["ch_id"], state["msg_text"], reply_markup=markup) |
| bot.send_message(uid, f"✅ 成功!消息已作为纯文本发送至频道 `{state['ch_id']}`") |
| except Exception as e2: |
| bot.send_message(uid, f"❌ 发送失败,请检查频道ID和机器人的权限: {e2}") |
| user_states.pop(uid) |
|
|
| elif step == "WAIT_BTN_OLD_CH": |
| user_states[uid].update({"step": "WAIT_BTN_OLD_MSG", "ch_id": text}) |
| bot.send_message(uid, "2️⃣ 请输入目标**消息的 ID 或者 链接**:") |
| elif step == "WAIT_BTN_OLD_MSG": |
| if text.startswith('http'): text = text.split('/')[-1] |
| if not text.isdigit(): return bot.send_message(uid, "❌ 格式错误!请输入纯数字 ID 或者包含 ID 的链接:") |
| user_states[uid].update({"step": "WAIT_BTN_OLD_BTEXT", "msg_id": int(text)}) |
| bot.send_message(uid, "3️⃣ 请输入**按钮上显示的文字**:\n💡 如果你想**删除**该消息现有的按钮,请直接回复 `删除`") |
| elif step == "WAIT_BTN_OLD_BTEXT": |
| if text.strip() == "删除": |
| try: |
| bot.edit_message_reply_markup(chat_id=state["ch_id"], message_id=state["msg_id"], reply_markup=None) |
| bot.send_message(uid, f"✅ 成功!已移除了消息 `{state['msg_id']}` 上的所有按钮。") |
| except Exception as e: |
| bot.send_message(uid, f"❌ 操作失败: {e}") |
| user_states.pop(uid) |
| return |
| user_states[uid].update({"step": "WAIT_BTN_OLD_URL", "btn_text": text}) |
| bot.send_message(uid, "4️⃣ 请输入**按钮跳转的链接 URL**:") |
| elif step == "WAIT_BTN_OLD_URL": |
| if not text.startswith("http"): |
| return bot.send_message(uid, "❌ 链接格式错误!必须以 http:// 或 https:// 开头。请重新输入:") |
| markup = types.InlineKeyboardMarkup() |
| markup.add(types.InlineKeyboardButton(text=state["btn_text"], url=text)) |
| try: |
| bot.edit_message_reply_markup(chat_id=state["ch_id"], message_id=state["msg_id"], reply_markup=markup) |
| bot.send_message(uid, f"✅ 成功!已为消息 `{state['msg_id']}` 更新了按钮。") |
| except Exception as e: |
| bot.send_message(uid, f"❌ 操作失败: {e}") |
| user_states.pop(uid) |
|
|
| elif step == "WAIT_AB_ID": |
| user_states[uid].update({"step": "WAIT_AB_NAME", "ab_id": text}) |
| bot.send_message(uid, "📝 请为这个频道输入一个**自定义备注名**:") |
| elif step == "WAIT_AB_NAME": |
| if "address_book" not in DATA["users"][uid]: DATA["users"][uid]["address_book"] = {} |
| DATA["users"][uid]["address_book"][state["ab_id"]] = text |
| save_data() |
| bot.send_message(uid, f"✅ 已将频道 `{state['ab_id']}` 保存为 **{text}**!") |
| user_states.pop(uid) |
|
|
| elif step == "WAIT_SRC": |
| user_states[uid] = {"step": "WAIT_TGT", "src": text} |
| send_channel_prompt(uid, "请输入目标频道 ID:") |
| elif step == "WAIT_TGT": |
| DATA["users"][uid]["groups"].append({"src": state["src"], "tgt": text}) |
| save_data() |
| bot.send_message(uid, "✅ 转发配置完毕!") |
| user_states.pop(uid) |
|
|
| elif step == "WAIT_STAT_NAME": |
| user_states[uid].update({"step": "WAIT_STAT_CH", "task_name": text}) |
| send_channel_prompt(uid, "2️⃣ 请输入**频道 ID**:") |
| elif step == "WAIT_STAT_CH": |
| user_states[uid].update({"step": "WAIT_STAT_MSG", "ch_id": text}) |
| bot.send_message(uid, "3️⃣ 请输入**消息 ID 或者 消息链接**:") |
| elif step == "WAIT_STAT_MSG": |
| if text.startswith('http'): text = text.split('/')[-1] |
| if not text.isdigit(): return bot.send_message(uid, "❌ 格式错误!请重新输入消息 ID:") |
| user_states[uid].update({"step": "WAIT_STAT_TITLE", "msg_id": text}) |
| bot.send_message(uid, "4️⃣ 请输入**统计表显示的标题**:") |
| elif step == "WAIT_STAT_TITLE": |
| user_states[uid].update({"step": "WAIT_STAT_TOPN", "table_title": text}) |
| bot.send_message(uid, "5️⃣ 请输入**只显示前几名** (纯数字):") |
| elif step == "WAIT_STAT_TOPN": |
| if not text.isdigit(): return bot.send_message(uid, "❌ 只能输入纯数字!") |
| user_states[uid].update({"step": "WAIT_STAT_TRIGGER", "top_n": int(text)}) |
| bot.send_message(uid, "6️⃣ 请输入**触发更新的自定义标签** (带 # 号):") |
| elif step == "WAIT_STAT_TRIGGER": |
| if not text.startswith('#'): return bot.send_message(uid, "❌ 必须以 # 开头!") |
| user_states[uid].update({"step": "WAIT_STAT_INTERVAL", "trigger_tag": text}) |
| bot.send_message(uid, "7️⃣ 设置**更新频率** (分钟):") |
| elif step == "WAIT_STAT_INTERVAL": |
| if not text.isdigit(): return bot.send_message(uid, "❌ 只能输入纯数字!") |
| user_states[uid].update({"step": "WAIT_STAT_DURATION", "interval": int(text)}) |
| bot.send_message(uid, "8️⃣ 设置**寿命期限** (天):") |
| elif step == "WAIT_STAT_DURATION": |
| if not text.isdigit(): return bot.send_message(uid, "❌ 只能输入纯数字!") |
| user_states[uid].update({"step": "WAIT_STAT_BLACKLIST", "duration": int(text)}) |
| bot.send_message(uid, "9️⃣ 请输入**屏蔽名单** (用空格隔开)\n💡 不需要屏蔽请回复 `无`:") |
| |
| elif step == "WAIT_STAT_BLACKLIST": |
| blacklist = [] if text.strip() == "无" else [x.strip() for x in re.split(r'[\s\n]+', text) if x.strip()] |
| user_states[uid].update({"step": "WAIT_STAT_BL_TITLE", "stats_blacklist": blacklist}) |
| bot.send_message(uid, "🔟 请输入**屏蔽区的标题** (例如 `🚫本月轮换限制:`)\n💡 不需要请回复 `无`:") |
| |
| elif step == "WAIT_STAT_BL_TITLE": |
| bl_title = "" if text.strip() == "无" else text.strip() |
| if "stats_tasks" not in DATA["users"][uid]: DATA["users"][uid]["stats_tasks"] = [] |
| DATA["users"][uid]["stats_tasks"].append({ |
| "task_name": state["task_name"], "channel_id": state["ch_id"], "msg_id": state["msg_id"], |
| "table_title": state["table_title"], "top_n": state["top_n"], "trigger_tag": state["trigger_tag"], |
| "interval": state["interval"], "duration": state["duration"], "start_time": int(time.time()), |
| "last_run": 0, "completed_items": [], "last_checked_msg_id": int(state["msg_id"]), |
| "stats_blacklist": state["stats_blacklist"], |
| "blacklist_title": bl_title |
| }) |
| save_data() |
| bot.send_message(uid, "✅ 完美!任务已创建。") |
| user_states.pop(uid) |
|
|
| elif step.startswith("EDIT_STAT_"): |
| idx = state["idx"] |
| try: |
| task = DATA["users"][uid]["stats_tasks"][idx] |
| if step == "EDIT_STAT_NAME": task["task_name"] = text |
| elif step == "EDIT_STAT_TITL": task["table_title"] = text |
| elif step == "EDIT_STAT_TOPN": task["top_n"] = int(text) |
| elif step == "EDIT_STAT_INTV": task["interval"] = int(text) |
| elif step == "EDIT_STAT_DURA": task["duration"] = int(text) |
| elif step == "EDIT_STAT_CHID": task["channel_id"] = text |
| elif step == "EDIT_STAT_TRIG": |
| if not text.startswith('#'): return bot.send_message(uid, "❌ 必须以 # 开头!") |
| task["trigger_tag"] = text |
| elif step == "EDIT_STAT_MSGID": |
| if text.startswith('http'): text = text.split('/')[-1] |
| if not text.isdigit(): return bot.send_message(uid, "❌ 格式错误!") |
| task["msg_id"] = text |
| task['last_checked_msg_id'] = int(text) |
| task['completed_items'] = [] |
| elif step == "EDIT_STAT_ADDBL": |
| task.setdefault("stats_blacklist", []) |
| new_items = [x.strip() for x in re.split(r'[\s\n]+', text) if x.strip()] |
| task["stats_blacklist"].extend(new_items) |
| task["stats_blacklist"] = list(set(task["stats_blacklist"])) |
| elif step == "EDIT_STAT_RMBL": |
| to_remove = [x.strip() for x in re.split(r'[\s\n]+', text) if x.strip()] |
| task["stats_blacklist"] = [x for x in task.get("stats_blacklist", []) if x not in to_remove] |
| elif step == "EDIT_STAT_BLTITLE": |
| task["blacklist_title"] = "" if text.strip() == "无" else text.strip() |
| |
| task["last_html_stats"] = "" |
| task["last_run"] = 0 |
| |
| save_data() |
| bot.send_message(uid, "✅ 属性已修改!下次刷新周期将立即更新。") |
| except Exception as e: bot.send_message(uid, f"❌ 修改失败: {e}") |
| user_states.pop(uid) |
| |
| elif step == "WAIT_BACKUP_SRC": |
| user_states[uid] = {"step": "WAIT_BACKUP_TGT", "src": text} |
| send_channel_prompt(uid, "📌 请输入【目标频道 ID】:") |
| elif step == "WAIT_BACKUP_TGT": |
| user_states[uid].update({"step": "WAIT_BACKUP_ID_M", "tgt": text}) |
| bot.send_message(uid, "🚀 请粘贴源频道中最新那条消息的链接:") |
| elif step in ["WAIT_BACKUP_ID_G", "WAIT_BACKUP_ID_M"]: |
| try: |
| latest_id = int(text.strip().split('/')[-1]) |
| if step == "WAIT_BACKUP_ID_G": |
| idx = state["g_idx"] |
| src = DATA["users"][uid]["groups"][idx]["src"] |
| tgt = DATA["users"][uid]["groups"][idx]["tgt"] |
| else: |
| src = state["src"] |
| tgt = state["tgt"] |
| Thread(target=run_smart_backup_v2, args=(latest_id, uid, src, tgt)).start() |
| user_states.pop(uid) |
| except Exception as e: bot.send_message(uid, f"❌ 链接解析失败: {e}") |
|
|
| elif step == "WAIT_MANUAL_DIR_CH": |
| bot.send_message(uid, "🔍 正在扫描频道历史标签,请耐心等待...") |
| Thread(target=generate_smart_directory, args=(uid, text)).start() |
| user_states.pop(uid) |
| elif step == "WAIT_DIR_NAME": |
| user_states[uid].update({"step": "WAIT_DIR_CH", "task_name": text}) |
| send_channel_prompt(uid, "2️⃣ 请输入**频道 ID**:") |
| elif step == "WAIT_DIR_CH": |
| user_states[uid].update({"step": "WAIT_DIR_MSG", "ch_id": text}) |
| bot.send_message(uid, "3️⃣ 请输入承载目录的**消息 ID**:") |
| elif step == "WAIT_DIR_MSG": |
| if text.startswith('http'): text = text.split('/')[-1] |
| user_states[uid].update({"step": "WAIT_DIR_BLACKLIST", "msg_id": text}) |
| bot.send_message(uid, "4️⃣ 请输入需要**屏蔽的标签** (空格隔开,不屏蔽回复 `无`):") |
| elif step == "WAIT_DIR_BLACKLIST": |
| blacklist = [] if text.strip() == "无" else text.split() |
| if "dir_tasks" not in DATA["users"][uid]: DATA["users"][uid]["dir_tasks"] = [] |
| DATA["users"][uid]["dir_tasks"].append({ |
| "task_name": state["task_name"], "channel_id": state["ch_id"], |
| "msg_id": state["msg_id"], "blacklist": blacklist, "interval": 15, |
| "tags_cache": [], "tags_map": {}, "scanned_msgs": {}, "last_html_dir": "" |
| }) |
| save_data() |
| bot.send_message(uid, "✅ 目录任务建立完成!") |
| user_states.pop(uid) |
|
|
| elif step.startswith("EDIT_DIR_"): |
| idx = state["idx"] |
| try: |
| task = DATA["users"][uid]["dir_tasks"][idx] |
| if step == "EDIT_DIR_ADDBL": |
| task["blacklist"].extend(text.split()) |
| task["blacklist"] = list(set(task["blacklist"])) |
| elif step == "EDIT_DIR_RMBL": |
| to_rem = text.split() |
| task["blacklist"] = [t for t in task["blacklist"] if t not in to_rem] |
| elif step == "EDIT_DIR_INTV": |
| task["interval"] = int(text) |
| save_data() |
| bot.send_message(uid, "✅ 目录属性已修改!") |
| except Exception as e: bot.send_message(uid, f"❌ 修改失败: {e}") |
| user_states.pop(uid) |
|
|
| elif step == "WAIT_REP_CH": |
| user_states[uid].update({"step": "WAIT_REP_OLD", "ch_id": text}) |
| bot.send_message(uid, "2️⃣ 请输入【要被替换的旧标签】(带 #):") |
| elif step == "WAIT_REP_OLD": |
| if not text.startswith('#'): return bot.send_message(uid, "❌ 必须以 # 开头!") |
| user_states[uid].update({"step": "WAIT_REP_NEW", "old_tag": text}) |
| bot.send_message(uid, "3️⃣ 请输入【新标签】(想删除回复 `删除`):") |
| elif step == "WAIT_REP_NEW": |
| new_tag = "" if text == "删除" else text |
| ch_id = state["ch_id"] |
| old_tag = state["old_tag"] |
| bot.send_message(uid, f"🚀 开始批量替换...") |
| Thread(target=batch_replace_tags, args=(uid, ch_id, old_tag, new_tag)).start() |
| user_states.pop(uid) |
| else: bot.send_message(uid, "💡 请用 **Menu** 菜单选择指令。") |
|
|
| @bot.message_handler(func=lambda m: m.chat.type == 'private') |
| def handle_private(message): |
| uid = str(message.from_user.id) |
| text = message.text.strip() |
| process_user_text(uid, text) |
|
|
| @bot.callback_query_handler(func=lambda call: call.data.startswith("selch_")) |
| def handle_selch(call): |
| uid = str(call.from_user.id) |
| ch_id = call.data.split("_", 1)[1] |
| bot.send_message(uid, f"👉 已快捷选择频道: `{ch_id}`", parse_mode="Markdown") |
| process_user_text(uid, ch_id) |
| bot.answer_callback_query(call.id) |
|
|
| @bot.callback_query_handler(func=lambda call: call.data.startswith("bkp_")) |
| def handle_backup_callbacks(call): |
| uid = str(call.from_user.id) |
| if call.data == "bkp_m": |
| user_states[uid] = {"step": "WAIT_BACKUP_SRC"} |
| send_channel_prompt(uid, "✏️ **手动备份模式**\n\n请输入【源频道 ID】:") |
| elif call.data.startswith("bkp_g_"): |
| idx = int(call.data.split("_")[2]) |
| user_states[uid] = {"step": "WAIT_BACKUP_ID_G", "g_idx": idx} |
| bot.edit_message_text(f"🚀 已选择 **组 {idx+1}**\n\n请粘贴源频道最新消息的链接:", call.message.chat.id, call.message.message_id, parse_mode="Markdown") |
|
|
| @bot.callback_query_handler(func=lambda call: True) |
| def handle_callbacks(call): |
| uid = str(call.from_user.id) |
| data = call.data |
| if data == "ab_add": |
| user_states[uid] = {"step": "WAIT_AB_ID"} |
| bot.send_message(uid, "📌 请输入要保存的**频道 ID**:") |
| return bot.answer_callback_query(call.id) |
| elif data == "ab_del_menu": |
| address_book = DATA["users"].get(uid, {}).get("address_book", {}) |
| markup = types.InlineKeyboardMarkup(row_width=1) |
| for cid, name in address_book.items(): markup.add(types.InlineKeyboardButton(f"❌ 删除: {name}", callback_data=f"ab_del_{cid}")) |
| bot.edit_message_text("🗑️ 请选择要删除的频道:", call.message.chat.id, call.message.message_id, reply_markup=markup) |
| return bot.answer_callback_query(call.id) |
| elif data.startswith("ab_del_"): |
| cid = data.split("ab_del_")[1] |
| if "address_book" in DATA["users"][uid] and cid in DATA["users"][uid]["address_book"]: |
| del DATA["users"][uid]["address_book"][cid] |
| save_data() |
| bot.edit_message_text("✅ 频道已移除。", call.message.chat.id, call.message.message_id) |
| return bot.answer_callback_query(call.id) |
| elif data.startswith("selch_") or data.startswith("bkp_"): return |
| try: |
| action, idx_str = data.rsplit("_", 1) |
| idx = int(idx_str) |
| except ValueError: return |
| if action == "del": |
| del DATA["users"][uid]["groups"][idx] |
| bot.edit_message_text("❌ 转发任务已移除", call.message.chat.id, call.message.message_id) |
| save_data() |
| elif action == "d_s": |
| del DATA["users"][uid]["stats_tasks"][idx] |
| bot.edit_message_text("❌ 统计任务已移除", call.message.chat.id, call.message.message_id) |
| save_data() |
| elif action == "d_d": |
| del DATA["users"][uid]["dir_tasks"][idx] |
| bot.edit_message_text("❌ 目录任务已移除", call.message.chat.id, call.message.message_id) |
| save_data() |
| elif action in ["e_name", "e_titl", "e_trig", "e_topn", "e_intv", "e_dura", "e_chid", "e_msgid", "e_sabl", "e_srbl", "e_sblt", "ed_ab", "ed_rb", "ed_in"]: |
| prompt_map = { |
| "e_name": "📌 请输入新的任务名称:", |
| "e_titl": "📌 请输入新的表头标题:", |
| "e_trig": "📌 请输入新的触发标签(带#):", |
| "e_topn": "📌 请输入新的上榜名额(纯数字):", |
| "e_intv": "📌 请输入新的更新频率(分钟):", |
| "e_dura": "📌 请输入新的存活期限(天):", |
| "e_chid": "📌 请输入新的频道 ID:", |
| "e_msgid": "📌 请输入新的消息 ID 或链接:", |
| "e_sabl": "🚫 请输入要**屏蔽的名字** (空格隔开):", |
| "e_srbl": "✅ 请输入要**解除屏蔽的名字** (空格隔开):", |
| "e_sblt": "📝 请输入**屏蔽区的显示标题**\n(例如 `🚫本月轮换限制:`,回复 `无` 则不显示):", |
| "ed_ab": "📌 请输入要追加的屏蔽标签(空格隔开):", |
| "ed_rb": "📌 请输入要移出屏蔽的标签(空格隔开):", |
| "ed_in": "📌 请输入新的扫描频率(分钟):" |
| } |
| state_map = { |
| "e_name": "EDIT_STAT_NAME", |
| "e_titl": "EDIT_STAT_TITL", |
| "e_trig": "EDIT_STAT_TRIG", |
| "e_topn": "EDIT_STAT_TOPN", |
| "e_intv": "EDIT_STAT_INTV", |
| "e_dura": "EDIT_STAT_DURA", |
| "e_chid": "EDIT_STAT_CHID", |
| "e_msgid": "EDIT_STAT_MSGID", |
| "e_sabl": "EDIT_STAT_ADDBL", |
| "e_srbl": "EDIT_STAT_RMBL", |
| "e_sblt": "EDIT_STAT_BLTITLE", |
| "ed_ab": "EDIT_DIR_ADDBL", |
| "ed_rb": "EDIT_DIR_RMBL", |
| "ed_in": "EDIT_DIR_INTV" |
| } |
| user_states[uid] = {"step": state_map[action], "idx": idx} |
| bot.send_message(uid, prompt_map[action]) |
| bot.answer_callback_query(call.id, "请在对话框输入新值") |
|
|
| def run_smart_backup_v2(latest_id, uid, src, tgt): |
| global TL_LOOP, TL_CLIENT |
| backup_key = f"{src}_to_{tgt}" |
| if backup_key not in DATA["backup_log"]: DATA["backup_log"][backup_key] = [] |
| if not TL_LOOP or not TL_CLIENT: return bot.send_message(uid, "❌ 错误: Userbot 未启动。") |
| bot.send_message(uid, f"🔍 **[双擎同步 1/2]**\n正在扫描历史...") |
|
|
| async def fetch_valid_tasks(): |
| messages = [] |
| ref_msg = await TL_CLIENT.get_messages(int(src), ids=latest_id) |
| target_grouped_id = ref_msg.grouped_id if ref_msg else None |
| async for msg in TL_CLIENT.iter_messages(int(src)): |
| if msg.action is not None: |
| continue |
| if msg.id > latest_id: |
| if target_grouped_id and msg.grouped_id == target_grouped_id: messages.append(msg) |
| continue |
| messages.append(msg) |
| messages.reverse() |
| tasks, album_cache = [], {} |
| for msg in messages: |
| if msg.grouped_id: |
| if msg.grouped_id not in album_cache: |
| album_cache[msg.grouped_id] = [] |
| tasks.append({'type': 'album', 'grouped_id': msg.grouped_id}) |
| album_cache[msg.grouped_id].append(msg.id) |
| else: tasks.append({'type': 'single', 'id': msg.id}) |
| return tasks, album_cache, len(messages) |
|
|
| try: |
| future = asyncio.run_coroutine_threadsafe(fetch_valid_tasks(), TL_LOOP) |
| tasks, album_cache, total_valid = future.result(timeout=300) |
| except Exception as e: return bot.send_message(uid, f"❌ 扫描失败: {e}") |
|
|
| bot.send_message(uid, f"✅ 锁定 **{total_valid}** 条有效消息。\n🚀 **[双擎同步 2/2]**\n开始搬运...") |
| success = 0 |
| failed = 0 |
| failed_ids = [] |
| for task in tasks: |
| msg_ids_to_copy = album_cache[task['grouped_id']] if task['type'] == 'album' else [task['id']] |
| if any(m_id in DATA["backup_log"][backup_key] for m_id in msg_ids_to_copy): continue |
| while True: |
| try: |
| if task['type'] == 'album': bot.copy_messages(tgt, src, msg_ids_to_copy) |
| else: bot.copy_message(tgt, src, msg_ids_to_copy[0]) |
| DATA["backup_log"][backup_key].extend(msg_ids_to_copy) |
| success += len(msg_ids_to_copy) |
| time.sleep(3.0 if task['type'] == 'album' else 1.5) |
| if success % 20 == 0: save_data() |
| break |
| except ApiTelegramException as e: |
| if e.error_code == 429: |
| time.sleep(e.result_json.get('parameters', {}).get('retry_after', 10)) |
| else: |
| failed += len(msg_ids_to_copy) |
| failed_ids.extend(msg_ids_to_copy) |
| break |
| except Exception: |
| failed += len(msg_ids_to_copy) |
| failed_ids.extend(msg_ids_to_copy) |
| break |
| save_data() |
| |
| report = f"🏁 **备份完成!**\n源: `{src}` ➡️ 目: `{tgt}`\n✅ 新增 **{success}** 条" |
| if failed > 0: |
| report += f"\n❌ 失败 **{failed}** 条" |
| show_ids = failed_ids[:10] |
| report += f"\n失败消息 ID: `{show_ids}`" |
| if len(failed_ids) > 10: |
| report += f"\n... 等共 {len(failed_ids)} 条" |
| bot.send_message(uid, report) |
| push_event(uid, "backup_done", f"✅ 备份完成,新增 {success} 条,失败 {failed} 条") |
|
|
| def generate_smart_directory(uid, ch_id): |
| global TL_LOOP, TL_CLIENT |
| if not TL_LOOP or not TL_CLIENT: return bot.send_message(uid, "❌ 错误: Userbot 未启动。") |
| async def scan_and_build(): |
| tags_set = set() |
| try: |
| async for msg in TL_CLIENT.iter_messages(int(ch_id)): |
| if msg.raw_text and '#' in msg.raw_text: |
| if not msg.entities: clean_text = msg.raw_text |
| else: |
| html_text = tl_html.unparse(msg.raw_text, msg.entities) |
| clean_text = html.unescape(re.sub(r'<.*?>', '', re.sub(r'<blockquote.*?>.*?</blockquote>', '', html_text, flags=re.DOTALL))) |
| for t in re.findall(r'#[A-Za-z0-9_\u4e00-\u9fa5]+', clean_text): tags_set.add(t) |
| except Exception as e: return None, str(e) |
| if not tags_set: return None, "没有找到有效标签。" |
| directory_map = {} |
| for tag in tags_set: |
| clean_str = tag[1:] |
| if not clean_str: continue |
| fc = clean_str[0] |
| key = "#" |
| if fc.isalpha() and fc.isascii(): key = fc.upper() |
| elif fc.isdigit(): key = "0-9" |
| elif '\u4e00' <= fc <= '\u9fff': |
| try: |
| py = lazy_pinyin(fc) |
| if py and len(py[0])>0: key = py[0][0].upper() |
| except NameError: key = "中文" |
| if key not in directory_map: directory_map[key] = [] |
| directory_map[key].append(tag) |
| return directory_map, None |
|
|
| try: |
| future = asyncio.run_coroutine_threadsafe(scan_and_build(), TL_LOOP) |
| directory_map, err = future.result(timeout=600) |
| except Exception as e: return bot.send_message(uid, f"❌ 扫描崩溃: {e}") |
| if err: return bot.send_message(uid, f"❌ 扫描失败: {err}") |
|
|
| lines = ["目录:\n<blockquote expandable>"] |
| keys = sorted(directory_map.keys()) |
| if "0-9" in keys: keys.remove("0-9"); keys.insert(0, "0-9") |
| for key in keys: |
| tags_line = " ".join([html.escape(t) for t in sorted(directory_map[key])]) |
| lines.append(f"{key}: {tags_line}\n") |
| lines.append("</blockquote>") |
| final_text = "\n".join(lines) |
| if len(final_text) > 4000: |
| bot.send_message(uid, "⚠️ 目录太长已截断。") |
| final_text = final_text[:4000] + "\n... </blockquote>" |
| try: |
| bot.send_message(uid, final_text, parse_mode="HTML") |
| bot.send_message(uid, "✅ 手动目录生成完毕!") |
| push_event(uid, "dir_done", "✅ 手动目录生成完毕") |
| except Exception as e: bot.send_message(uid, f"❌ 发送失败: {e}") |
|
|
| def batch_replace_tags(uid, ch_id, old_tag, new_tag): |
| global TL_LOOP, TL_CLIENT |
| if not TL_LOOP or not TL_CLIENT: return bot.send_message(uid, "❌ 错误: Userbot 未启动。") |
| async def do_replace(): |
| success = 0 |
| try: |
| pattern = re.compile(re.escape(old_tag) + r'(?![A-Za-z0-9_\u4e00-\u9fa5])') |
| async for msg in TL_CLIENT.iter_messages(int(ch_id)): |
| if msg.raw_text and old_tag in msg.raw_text: |
| html_text = tl_html.unparse(msg.raw_text, msg.entities) |
| if pattern.search(html_text): |
| new_html_text = pattern.sub(new_tag, html_text) |
| while True: |
| try: |
| if msg.photo or msg.video or msg.document: bot.edit_message_caption(caption=new_html_text, chat_id=ch_id, message_id=msg.id, parse_mode="HTML") |
| else: bot.edit_message_text(text=new_html_text, chat_id=ch_id, message_id=msg.id, parse_mode="HTML") |
| success += 1 |
| await asyncio.sleep(2) |
| break |
| except ApiTelegramException as e: |
| if e.error_code == 429: await asyncio.sleep(e.result_json.get('parameters', {}).get('retry_after', 10)) |
| else: break |
| except Exception: break |
| except Exception as e: return success, str(e) |
| return success, None |
| try: |
| future = asyncio.run_coroutine_threadsafe(do_replace(), TL_LOOP) |
| success, err = future.result(timeout=3600) |
| except Exception as e: return bot.send_message(uid, f"❌ 任务崩溃: {e}") |
| if err: bot.send_message(uid, f"⚠️ 有警告: {err}\n✅ 仍修改了 **{success}** 条消息!") |
| else: bot.send_message(uid, f"🏁 **修改完成!**\n修改了 **{success}** 条历史消息!") |
| push_event(uid, "replace_done", f"✅ 标签替换完成,修改 {success} 条") |
|
|
| def process_media_group(mg_id, targets): |
| with mg_lock: |
| if mg_id not in media_group_cache: return |
| msgs = media_group_cache.pop(mg_id) |
| msgs.sort(key=lambda x: x.message_id) |
| for tgt in targets: |
| media_list = [] |
| for m in msgs: |
| is_spoiler = getattr(m, 'has_media_spoiler', False) |
| if m.photo: media_list.append(types.InputMediaPhoto(m.photo[-1].file_id, caption=m.caption, caption_entities=m.caption_entities, has_spoiler=is_spoiler)) |
| elif m.video: media_list.append(types.InputMediaVideo(m.video.file_id, caption=m.caption, caption_entities=m.caption_entities, has_spoiler=is_spoiler)) |
| elif m.document: media_list.append(types.InputMediaDocument(m.document.file_id, caption=m.caption, caption_entities=m.caption_entities)) |
| elif m.audio: media_list.append(types.InputMediaAudio(m.audio.file_id, caption=m.caption, caption_entities=m.caption_entities)) |
| if not media_list: continue |
| try: |
| sent_msgs = bot.send_media_group(tgt, media_list) |
| for src_m, sent_m in zip(msgs, sent_msgs): |
| src_mid = str(src_m.message_id) |
| if src_mid not in DATA["msg_map"]: DATA["msg_map"][src_mid] = {} |
| DATA["msg_map"][src_mid][tgt] = sent_m.message_id |
| except Exception: pass |
| save_data() |
|
|
| @bot.channel_post_handler(func=lambda m: True, content_types=ALL_TYPES) |
| def handle_post(message): |
| src_id = str(message.chat.id) |
| targets = [g["tgt"] for u in DATA["users"].values() for g in u["groups"] if str(g["src"]) == src_id] |
| if not targets: return |
| if message.media_group_id: |
| mg_id = message.media_group_id |
| with mg_lock: |
| if mg_id not in media_group_cache: |
| media_group_cache[mg_id] = [] |
| Timer(2.0, process_media_group, args=(mg_id, targets)).start() |
| media_group_cache[mg_id].append(message) |
| return |
| src_mid = str(message.message_id) |
| if src_mid not in DATA["msg_map"]: DATA["msg_map"][src_mid] = {} |
| for tgt in targets: |
| try: |
| copied = bot.copy_message(tgt, message.chat.id, message.message_id) |
| DATA["msg_map"][src_mid][tgt] = copied.message_id |
| except Exception: pass |
| save_data() |
|
|
| @bot.edited_channel_post_handler(func=lambda m: True, content_types=ALL_TYPES) |
| def handle_edit(message): |
| src_mid = str(message.message_id) |
| if src_mid in DATA["msg_map"]: |
| for tgt, dst_id in DATA["msg_map"][src_mid].items(): |
| try: |
| if message.content_type == 'text': bot.edit_message_text(message.text, tgt, dst_id, entities=message.entities) |
| else: bot.edit_message_caption(caption=message.caption, chat_id=tgt, message_id=dst_id, caption_entities=message.caption_entities) |
| except Exception: pass |
|
|
| |
| def start_telethon_worker(): |
| global TL_LOOP, TL_CLIENT |
| api_id_str = os.environ.get("API_ID") |
| api_hash = os.environ.get("API_HASH") |
| user_session = os.environ.get("USER_SESSION") |
| if not api_id_str or not user_session: |
| print("⚠️ 未检测到 USER_SESSION,双擎模块跳过。") |
| return |
| TL_LOOP = asyncio.new_event_loop() |
| asyncio.set_event_loop(TL_LOOP) |
| TL_CLIENT = TelegramClient(StringSession(user_session), int(api_id_str), api_hash) |
|
|
| async def update_channel_msg(): |
| current_time = int(time.time()) |
| data_changed = False |
| for uid, u_data in DATA.get("users", {}).items(): |
| tasks = u_data.get("stats_tasks", []) |
| for i in range(len(tasks) - 1, -1, -1): |
| task = tasks[i] |
| ch_id = int(task['channel_id']) |
| try: msg_id = int(str(task['msg_id']).split('/')[-1]) |
| except ValueError: continue |
| table_title = task.get('table_title', '📊 **热评榜**') |
| top_n = int(task.get('top_n', 10)) |
| trigger_tag = task.get('trigger_tag', '#未设置') |
| completed_items = [] |
| interval_sec = int(task.get('interval', 60)) * 60 |
| |
| if current_time > int(task.get('start_time', current_time)) + int(task.get('duration', 7)) * 86400: |
| del tasks[i]; data_changed = True; continue |
| if current_time - int(task.get('last_run', 0)) < interval_sec: continue |
| |
| try: |
| original_msg = await TL_CLIENT.get_messages(ch_id, ids=msg_id) |
| if not original_msg: continue |
| if original_msg.raw_text: |
| raw_html = tl_html.unparse(original_msg.raw_text, original_msg.entities) |
| base_html = raw_html.split("➖➖➖➖➖➖")[0].rstrip() if "➖➖➖➖➖➖" in raw_html else raw_html.rstrip() |
| else: base_html = "" |
| |
| comments_data_list = [] |
| discussion_chat_id = None |
| thread_id = None |
| |
| async for comment in TL_CLIENT.iter_messages(ch_id, reply_to=msg_id): |
| if not discussion_chat_id: |
| discussion_chat_id = comment.chat_id |
| if comment.reply_to: |
| thread_id = comment.reply_to.reply_to_top_id or comment.reply_to.reply_to_msg_id |
| |
| if comment.reactions: |
| total_reacts = sum(r.count for r in comment.reactions.results) |
| if total_reacts > 0: |
| full_raw_text = (comment.raw_text if comment.raw_text else "[图片]").replace('\n', ' ') |
| short_text = full_raw_text |
| if len(short_text) > 15: short_text = short_text[:14] + "…" |
| comments_data_list.append((total_reacts, html.escape(short_text), short_text, full_raw_text, comment.id)) |
| |
| |
| stats_blacklist = task.get('stats_blacklist', []) |
| blacklist_section = "" |
| if stats_blacklist: |
| bl_title = task.get('blacklist_title', '🚫本月轮换限制:') |
| bl_names = "\n".join([html.escape(n) for n in stats_blacklist]) |
| if bl_title: |
| blacklist_section = f"\n{html.escape(bl_title)}\n<blockquote>{bl_names}</blockquote>\n" |
| else: |
| blacklist_section = f"\n<blockquote>{bl_names}</blockquote>\n" |
|
|
| stats_section = "" |
| all_comments_for_file = [] |
| |
| if comments_data_list: |
| comments_data_list.sort(key=lambda x: x[0], reverse=True) |
| |
| deduped_comments = [] |
| for item in comments_data_list: |
| total, safe_text, raw_short, full_raw, c_id = item |
| base_name = re.split(r'[((]', full_raw)[0].strip() |
| |
| is_blocked = False |
| for blocked in stats_blacklist: |
| if blocked and len(blocked) >= 2: |
| if blocked in full_raw or blocked in base_name: |
| is_blocked = True |
| break |
| if is_blocked: |
| continue |
| |
| conflict = False |
| for added_item in deduped_comments: |
| added_full_raw = added_item[3] |
| added_base = re.split(r'[((]', added_full_raw)[0].strip() |
| if (len(base_name) >= 2 and base_name in added_full_raw) or (len(added_base) >= 2 and added_base in full_raw): |
| conflict = True; break |
| if not conflict: deduped_comments.append(item) |
| |
| comments_data_list = deduped_comments |
| completed_items = [] |
| |
| try: |
| async for newer_msg in TL_CLIENT.iter_messages(ch_id, limit=100, min_id=msg_id): |
| if newer_msg.id == msg_id: continue |
| if not newer_msg.raw_text or trigger_tag.lower() not in newer_msg.raw_text.lower(): continue |
| |
| if newer_msg.entities: |
| html_text = tl_html.unparse(newer_msg.raw_text, newer_msg.entities) |
| text_no_bq = re.sub(r'<blockquote.*?>.*?</blockquote>', '', html_text, flags=re.DOTALL) |
| clean_text = html.unescape(re.sub(r'<.*?>', '', text_no_bq)) |
| else: |
| clean_text = newer_msg.raw_text |
| |
| if trigger_tag.lower() not in clean_text.lower(): continue |
| found_tags = re.findall(r'#([A-Za-z0-9_\u4e00-\u9fa5]+)', clean_text) |
| |
| for item in comments_data_list: |
| raw_short = item[2] |
| full_raw = item[3] |
| base_name = re.split(r'[((]', full_raw)[0].strip() |
| for tag in found_tags: |
| if (len(tag) >= 2 and tag.lower() in full_raw.lower()) or (len(base_name) >= 2 and base_name.lower() in tag.lower()): |
| if raw_short not in completed_items: completed_items.append(raw_short) |
| break |
| except Exception: pass |
| |
| if task.get('completed_items') != completed_items: |
| task['completed_items'] = completed_items; data_changed = True |
| |
| comments_data_list.sort(key=lambda x: (x[2] in completed_items, x[0]), reverse=True) |
| all_comments_for_file = comments_data_list.copy() |
| |
| comments_data_list = comments_data_list[:top_n] |
| completed_count = sum(1 for item in comments_data_list if item[2] in completed_items) |
| max_digits = max([len(str(item[0])) for item in comments_data_list] + [1]) |
| |
| inner_lines = [] |
| for rank, item in enumerate(comments_data_list): |
| total, safe_text, raw_short, full_raw, c_id = item |
| medal = "🥇" if rank == 0 else "🥈" if rank == 1 else "🥉" if rank == 2 else "🔸" |
| display_text = f"<s>{safe_text}</s>" if raw_short in completed_items else safe_text |
| padded_total = str(total).rjust(max_digits, ' ') |
| inner_lines.append(f"{medal} <code>{padded_total}</code> 赞 | <i>{display_text}</i>") |
| |
| beijing_tz = timezone(timedelta(hours=8)) |
| now_str = datetime.now(beijing_tz).strftime("%m-%d %H:%M") |
| inner_lines.append(f"\n⏳ <code>最后更新: {now_str} (北京时间)</code>") |
| |
| |
| if all_comments_for_file: |
| c_chat_str = str(discussion_chat_id).replace("-100", "") if discussion_chat_id else "" |
| list_html = "" |
| for rank, item in enumerate(all_comments_for_file, 1): |
| total, safe_text, raw_short, full_raw, c_id = item |
| |
| if c_chat_str and thread_id: |
| link = f"tg://privatepost?channel={c_chat_str}&post={c_id}&thread={thread_id}" |
| elif c_chat_str: |
| link = f"tg://privatepost?channel={c_chat_str}&post={c_id}" |
| else: |
| link = "#" |
| |
| |
| display_name = html.escape(full_raw) |
| if raw_short in completed_items: |
| display_name = f"<s style='opacity: 0.5;'>{display_name}</s>" |
| |
| |
| list_html += f'<a href="{link}" class="item"><span class="rank">#{rank}</span><span class="name">{display_name}</span><span class="reacts">{total} 赞</span></a>' |
| |
| html_template = f"""<!DOCTYPE html> |
| <html lang="zh-CN"> |
| <head> |
| <meta charset="UTF-8"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| <title>完整榜单 - {html.escape(table_title)}</title> |
| <style> |
| body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: #1a1a2e; color: #eaeaea; padding: 15px; margin: 0; }} |
| .container {{ max-width: 600px; margin: 0 auto; background: #16213e; border-radius: 12px; padding: 15px; box-shadow: 0 4px 15px rgba(0,0,0,0.3); }} |
| h2 {{ text-align: center; color: #fff; margin-bottom: 20px; font-size: 18px; border-bottom: 1px solid rgba(255,255,255,0.1); padding-bottom: 10px; line-height: 1.5; }} |
| .hint {{ text-align: center; color: #5dade2; font-size: 13px; margin-top: -10px; margin-bottom: 15px; font-weight: 500; }} |
| .item {{ display: flex; justify-content: space-between; align-items: center; padding: 12px 0; border-bottom: 1px solid rgba(255,255,255,0.05); text-decoration: none; color: inherit; transition: background 0.2s; }} |
| .item:last-child {{ border-bottom: none; }} |
| .item:active {{ background: rgba(255,255,255,0.05); border-radius: 8px; }} |
| .rank {{ font-weight: bold; width: 35px; color: #e94560; font-size: 14px; }} |
| .name {{ flex: 1; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; padding: 0 10px; font-size: 14px; }} |
| .reacts {{ font-weight: bold; color: #5dade2; font-size: 14px; }} |
| </style> |
| </head> |
| <body> |
| <div class="container"> |
| <h2>📊 {html.escape(table_title)} <br><span style="font-size:12px;color:#8a8a9a;font-weight:normal">全量数据收录 | 更新于 {now_str}</span></h2> |
| <div class="hint">💡 点击名字跳转评论投票</div> |
| {list_html} |
| </div> |
| </body> |
| </html>""" |
| |
| cache_key = f"{ch_id}_{msg_id}" |
| HTML_CACHE[cache_key] = html_template |
| |
| |
| space_host = "bangdan.nine7.cc.cd" |
| file_msg_link = f"https://{space_host}/list/{cache_key}" |
|
|
| |
| stats_section = f"<b>{html.escape(table_title)} ({completed_count}/{top_n}) <a href='{file_msg_link}'>完整名单</a></b>\n<blockquote>{chr(10).join(inner_lines)}</blockquote>" |
| |
| else: |
| beijing_tz = timezone(timedelta(hours=8)) |
| now_str = datetime.now(beijing_tz).strftime("%m-%d %H:%M") |
| stats_section = f"<b>{html.escape(table_title)} (0/{top_n})</b>\n<blockquote>暂无评论数据\n\n⏳ <code>最后更新: {now_str} (北京时间)</code></blockquote>" |
|
|
| new_message_text = f"{base_html}{SEPARATOR_MARK}{blacklist_section}{stats_section}" |
| content_hash = f"{blacklist_section}|{stats_section}" |
| |
| if task.get('last_html_stats') != content_hash: |
| try: |
| if original_msg.photo or original_msg.video or original_msg.document: bot.edit_message_caption(caption=new_message_text, chat_id=ch_id, message_id=msg_id, parse_mode="HTML") |
| else: bot.edit_message_text(text=new_message_text, chat_id=ch_id, message_id=msg_id, parse_mode="HTML") |
| task['last_html_stats'] = content_hash; data_changed = True |
| except Exception: pass |
|
|
| task['last_run'] = current_time; data_changed = True; await asyncio.sleep(2) |
| except Exception as e: |
| pass |
| if data_changed: await asyncio.to_thread(save_data) |
|
|
| async def update_channel_dirs(): |
| current_time = int(time.time()) |
| data_changed = False |
| for uid, u_data in DATA.get("users", {}).items(): |
| tasks = u_data.get("dir_tasks", []) |
| for task in tasks: |
| ch_id = int(task['channel_id']) |
| msg_id = int(task['msg_id']) |
| blacklist = task.get('blacklist', []) |
| tags_map = task.get('tags_map', {}) |
| scanned_msgs = task.get('scanned_msgs', {}) |
| interval_sec = int(task.get('interval', 15)) * 60 |
| if current_time - int(task.get('last_run', 0)) < interval_sec: continue |
| is_first_run = not bool(scanned_msgs) |
| scan_kwargs = {'limit': None if is_first_run else 150} |
| new_tags_found = False |
| try: |
| original_msg = await TL_CLIENT.get_messages(ch_id, ids=msg_id) |
| if not original_msg: continue |
| if original_msg.raw_text: |
| raw_html = tl_html.unparse(original_msg.raw_text, original_msg.entities) |
| base_html = raw_html.split("➖➖➖➖➖➖")[0].rstrip() if "➖➖➖➖➖➖" in raw_html else raw_html.rstrip() |
| else: base_html = "" |
| except Exception: continue |
| try: |
| async for msg in TL_CLIENT.iter_messages(ch_id, **scan_kwargs): |
| if msg.id == msg_id: continue |
| msg_id_str = str(msg.id) |
| msg_time = msg.edit_date.timestamp() if msg.edit_date else msg.date.timestamp() |
| if scanned_msgs.get(msg_id_str) == msg_time: continue |
| scanned_msgs[msg_id_str] = msg_time |
| data_changed = True |
| found_tags = [] |
| if msg.raw_text and '#' in msg.raw_text: |
| if not msg.entities: clean_text = msg.raw_text |
| else: |
| html_text = tl_html.unparse(msg.raw_text, msg.entities) |
| text_without_bq = re.sub(r'<blockquote.*?>.*?</blockquote>', '', html_text, flags=re.DOTALL) |
| clean_text = html.unescape(re.sub(r'<.*?>', '', text_without_bq)) |
| found_tags = re.findall(r'#[A-Za-z0-9_\u4e00-\u9fa5]+', clean_text) |
| if found_tags: |
| tags_map[msg_id_str] = found_tags |
| new_tags_found = True |
| else: |
| if msg_id_str in tags_map: |
| del tags_map[msg_id_str] |
| new_tags_found = True |
| task['last_run'] = current_time |
| if new_tags_found or is_first_run: |
| task['tags_map'] = tags_map |
| task['scanned_msgs'] = scanned_msgs |
| data_changed = True |
| all_tags = set() |
| for t_list in tags_map.values(): all_tags.update(t_list) |
| active_tags = [t for t in all_tags if t not in blacklist] |
| task['tags_cache'] = active_tags |
| directory_map = {} |
| for tag in active_tags: |
| clean_str = tag[1:] |
| if not clean_str: continue |
| fc = clean_str[0] |
| key = "#" |
| if fc.isalpha() and fc.isascii(): key = fc.upper() |
| elif fc.isdigit(): key = "0-9" |
| elif '\u4e00' <= fc <= '\u9fff': |
| try: |
| py = lazy_pinyin(fc) |
| if py and len(py[0])>0: key = py[0][0].upper() |
| except NameError: key = "中文" |
| if key not in directory_map: directory_map[key] = [] |
| directory_map[key].append(tag) |
| lines = ["目录:\n<blockquote expandable>"] |
| keys = sorted(directory_map.keys()) |
| if "0-9" in keys: keys.remove("0-9"); keys.insert(0, "0-9") |
| for key in keys: |
| tags_line = " ".join([html.escape(t) for t in sorted(directory_map[key])]) |
| lines.append(f"{key}: {tags_line}\n") |
| lines.append("</blockquote>") |
| beijing_tz = timezone(timedelta(hours=8)) |
| now_str = datetime.now(beijing_tz).strftime("%m-%d %H:%M") |
| lines.append(f"\n⏳ <code>最后更新: {now_str} (北京时间)</code>") |
| stats_text = f"<blockquote>{chr(10).join(lines)}</blockquote>" |
| task_name = task.get('task_name', '标签目录') |
| safe_title_with_count = f"{html.escape(task_name)} ({len(active_tags)})" |
| new_message_text = f"{base_html}{SEPARATOR_MARK}<b>{safe_title_with_count}</b>\n{stats_text}" |
| if len(new_message_text) > 4000: new_message_text = new_message_text[:4000] + "\n... </blockquote>\n⚠️ 目录过长已截断" |
| if task.get('last_html_dir') != stats_text: |
| try: |
| if original_msg.photo or original_msg.video or original_msg.document: bot.edit_message_caption(caption=new_message_text, chat_id=ch_id, message_id=msg_id, parse_mode="HTML") |
| else: bot.edit_message_text(text=new_message_text, chat_id=ch_id, message_id=msg_id, parse_mode="HTML") |
| task['last_html_dir'] = stats_text |
| data_changed = True |
| except Exception: pass |
| except Exception: pass |
| if data_changed: await asyncio.to_thread(save_data) |
|
|
| TL_CLIENT.start() |
| scheduler = AsyncIOScheduler(event_loop=TL_LOOP) |
| scheduler.add_job(update_channel_msg, 'interval', seconds=10) |
| scheduler.add_job(update_channel_dirs, 'interval', seconds=15) |
| scheduler.start() |
| print("🚀 Telethon [全能排版+智能双擎后台] 已启动!") |
| TL_LOOP.run_until_complete(TL_CLIENT.run_until_disconnected()) |
|
|
| |
| app = Flask(__name__) |
|
|
| @app.after_request |
| def add_security_headers(response): |
| response.headers['X-Frame-Options'] = 'ALLOW-FROM https://web.telegram.org' |
| response.headers['Content-Security-Policy'] = "frame-ancestors 'self' https://web.telegram.org https://*.telegram.org;" |
| return response |
|
|
| def validate_webapp(req): |
| init_data = req.headers.get('X-Init-Data', '') |
| if not init_data: |
| return None |
| try: |
| parsed = {} |
| for part in init_data.split('&'): |
| if '=' in part: |
| k, v = part.split('=', 1) |
| parsed[k] = unquote(v) |
| check_hash = parsed.pop('hash', None) |
| if not check_hash or not BOT_TOKEN: |
| return None |
| data_check_string = "\n".join(f"{k}={parsed[k]}" for k in sorted(parsed.keys())) |
| secret_key = hmac.new(b"WebAppData", BOT_TOKEN.encode(), hashlib.sha256).digest() |
| computed = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest() |
| if not hmac.compare_digest(computed, check_hash): |
| print("⛔ HMAC 签名不匹配") |
| return None |
| auth_date = int(parsed.get('auth_date', 0)) |
| if abs(time.time() - auth_date) > 86400: |
| print("⛔ initData 已过期") |
| return None |
| user_obj = json.loads(parsed.get('user', '{}')) |
| uid = str(user_obj.get('id', '')) |
| if uid and uid not in DATA["users"]: |
| DATA["users"][uid] = {"groups": [], "stats_tasks": [], "dir_tasks": [], "address_book": {}} |
| save_data() |
| return uid if uid else None |
| except Exception as e: |
| print(f"❌ WebApp 鉴权错误: {e}") |
| return None |
|
|
| def need_auth(f): |
| @wraps(f) |
| def wrapper(*args, **kwargs): |
| uid = validate_webapp(request) |
| if not uid: |
| return jsonify({"ok": False, "msg": "未授权"}), 401 |
| if check_rate_limit(uid): |
| return jsonify({"ok": False, "msg": "操作太频繁,请稍后再试"}), 429 |
| return f(uid, *args, **kwargs) |
| return wrapper |
|
|
| @app.route('/') |
| def home(): |
| return "Bot is running" |
|
|
| @app.route('/webapp') |
| def webapp_page(): |
| return send_file('webapp.html') |
|
|
| |
| @app.route('/list/<cache_key>') |
| def view_list(cache_key): |
| html_content = HTML_CACHE.get(cache_key) |
| if not html_content: |
| return "暂无数据或页面已刷新,请等待机器人下次更新", 404 |
| return html_content |
|
|
| @app.route('/api/data') |
| @need_auth |
| def api_get_data(uid): |
| user = DATA["users"].get(uid, {}) |
| return jsonify({ |
| "ok": True, "user": user, |
| "msg_count": len(DATA.get("msg_map", {})), |
| "webdav_url": FULL_WEBDAV_URL, |
| "userbot": TL_CLIENT is not None and TL_LOOP is not None |
| }) |
|
|
| @app.route('/api/groups', methods=['POST']) |
| @need_auth |
| def api_add_group(uid): |
| d = request.json |
| DATA["users"][uid].setdefault("groups", []).append({"src": d["src"], "tgt": d["tgt"]}) |
| save_data() |
| return jsonify({"ok": True, "user": DATA["users"][uid]}) |
|
|
| @app.route('/api/groups/<int:idx>', methods=['DELETE']) |
| @need_auth |
| def api_del_group(uid, idx): |
| try: |
| del DATA["users"][uid]["groups"][idx] |
| save_data() |
| return jsonify({"ok": True, "user": DATA["users"][uid]}) |
| except: return jsonify({"ok": False, "msg": "索引无效"}) |
|
|
| @app.route('/api/channels', methods=['POST']) |
| @need_auth |
| def api_add_channel(uid): |
| d = request.json |
| DATA["users"][uid].setdefault("address_book", {})[d["id"]] = d["name"] |
| save_data() |
| return jsonify({"ok": True, "user": DATA["users"][uid]}) |
|
|
| @app.route('/api/channels/<path:cid>', methods=['DELETE']) |
| @need_auth |
| def api_del_channel(uid, cid): |
| ab = DATA["users"][uid].get("address_book", {}) |
| if cid in ab: |
| del ab[cid] |
| save_data() |
| return jsonify({"ok": True, "user": DATA["users"][uid]}) |
|
|
| @app.route('/api/stats', methods=['POST']) |
| @need_auth |
| def api_add_stat(uid): |
| d = request.json |
| msg_id_raw = str(d.get("msg_id", "")) |
| if msg_id_raw.startswith('http'): msg_id_raw = msg_id_raw.split('/')[-1] |
| DATA["users"][uid].setdefault("stats_tasks", []).append({ |
| "task_name": d["task_name"], "channel_id": d["channel_id"], "msg_id": msg_id_raw, |
| "table_title": d.get("table_title", "📊 统计"), "top_n": int(d.get("top_n", 10)), |
| "trigger_tag": d.get("trigger_tag", "#更新"), "interval": int(d.get("interval", 15)), |
| "duration": int(d.get("duration", 7)), "start_time": int(time.time()), |
| "last_run": 0, "completed_items": [], |
| "last_checked_msg_id": int(msg_id_raw) if msg_id_raw.isdigit() else 0, |
| "stats_blacklist": [x.strip() for x in re.split(r'[\s\n]+', d.get("stats_blacklist", "")) if x.strip()], |
| "blacklist_title": d.get("blacklist_title", "🚫本月轮换限制:") |
| }) |
| save_data() |
| return jsonify({"ok": True, "user": DATA["users"][uid]}) |
|
|
| @app.route('/api/stats/<int:idx>', methods=['PUT']) |
| @need_auth |
| def api_edit_stat(uid, idx): |
| d = request.json |
| field, val = d["field"], d["value"] |
| try: |
| task = DATA["users"][uid]["stats_tasks"][idx] |
| if field in ("top_n", "interval", "duration"): |
| task[field] = int(val) |
| elif field == "msg_id": |
| if val.startswith('http'): val = val.split('/')[-1] |
| task["msg_id"] = val |
| task['last_html_stats'] = "" |
| task['last_checked_msg_id'] = int(val) if val.isdigit() else 0 |
| task['completed_items'] = [] |
| if "file_msg_id" in task: del task["file_msg_id"] |
| elif field == "add_stats_bl": |
| task.setdefault("stats_blacklist", []) |
| new_items = [x.strip() for x in re.split(r'[\s\n]+', val) if x.strip()] |
| task["stats_blacklist"].extend(new_items) |
| task["stats_blacklist"] = list(set(task["stats_blacklist"])) |
| task["last_html_stats"] = "" |
| elif field == "rm_stats_bl": |
| to_remove = [x.strip() for x in re.split(r'[\s\n]+', val) if x.strip()] |
| task["stats_blacklist"] = [x for x in task.get("stats_blacklist", []) if x not in to_remove] |
| task["last_html_stats"] = "" |
| elif field == "blacklist_title": |
| task["blacklist_title"] = "" if val.strip() == "无" else val.strip() |
| task["last_html_stats"] = "" |
| else: |
| task[field] = val |
| |
| task["last_html_stats"] = "" |
| task["last_run"] = 0 |
| |
| save_data() |
| return jsonify({"ok": True, "user": DATA["users"][uid]}) |
| except Exception as e: |
| return jsonify({"ok": False, "msg": str(e)}) |
|
|
| @app.route('/api/stats/<int:idx>', methods=['DELETE']) |
| @need_auth |
| def api_del_stat(uid, idx): |
| try: |
| del DATA["users"][uid]["stats_tasks"][idx] |
| save_data() |
| return jsonify({"ok": True, "user": DATA["users"][uid]}) |
| except: return jsonify({"ok": False, "msg": "索引无效"}) |
|
|
| @app.route('/api/dirs', methods=['POST']) |
| @need_auth |
| def api_add_dir(uid): |
| d = request.json |
| msg_id_raw = str(d.get("msg_id", "")) |
| if msg_id_raw.startswith('http'): msg_id_raw = msg_id_raw.split('/')[-1] |
| DATA["users"][uid].setdefault("dir_tasks", []).append({ |
| "task_name": d["task_name"], "channel_id": d["channel_id"], "msg_id": msg_id_raw, |
| "blacklist": d.get("blacklist", []), "interval": 15, |
| "tags_cache": [], "tags_map": {}, "scanned_msgs": {}, "last_html_dir": "" |
| }) |
| save_data() |
| return jsonify({"ok": True, "user": DATA["users"][uid]}) |
|
|
| @app.route('/api/dirs/<int:idx>', methods=['PUT']) |
| @need_auth |
| def api_edit_dir(uid, idx): |
| d = request.json |
| field, val = d["field"], d["value"] |
| try: |
| task = DATA["users"][uid]["dir_tasks"][idx] |
| if field == "add_blacklist": |
| task["blacklist"].extend(val.split()) |
| task["blacklist"] = list(set(task["blacklist"])) |
| elif field == "rm_blacklist": |
| to_rem = val.split() |
| task["blacklist"] = [t for t in task["blacklist"] if t not in to_rem] |
| elif field == "interval": |
| task["interval"] = int(val) |
| save_data() |
| return jsonify({"ok": True, "user": DATA["users"][uid]}) |
| except Exception as e: |
| return jsonify({"ok": False, "msg": str(e)}) |
|
|
| @app.route('/api/dirs/<int:idx>', methods=['DELETE']) |
| @need_auth |
| def api_del_dir(uid, idx): |
| try: |
| del DATA["users"][uid]["dir_tasks"][idx] |
| save_data() |
| return jsonify({"ok": True, "user": DATA["users"][uid]}) |
| except: return jsonify({"ok": False, "msg": "索引无效"}) |
|
|
| @app.route('/api/btn_new', methods=['POST']) |
| @need_auth |
| def api_btn_new(uid): |
| d = request.json |
| markup = types.InlineKeyboardMarkup() |
| markup.add(types.InlineKeyboardButton(text=d["btn_text"], url=d["url"])) |
| try: |
| bot.send_message(d["ch_id"], d["text"], reply_markup=markup, parse_mode="HTML") |
| return jsonify({"ok": True}) |
| except Exception as e: |
| try: |
| bot.send_message(d["ch_id"], d["text"], reply_markup=markup) |
| return jsonify({"ok": True}) |
| except Exception as e2: |
| return jsonify({"ok": False, "msg": str(e2)}) |
|
|
| @app.route('/api/btn_old', methods=['POST']) |
| @need_auth |
| def api_btn_old(uid): |
| d = request.json |
| msg_id = d["msg_id"] |
| if msg_id.startswith('http'): msg_id = msg_id.split('/')[-1] |
| try: |
| if d["btn_text"].strip() == "删除": |
| bot.edit_message_reply_markup(chat_id=d["ch_id"], message_id=int(msg_id), reply_markup=None) |
| else: |
| markup = types.InlineKeyboardMarkup() |
| markup.add(types.InlineKeyboardButton(text=d["btn_text"], url=d["url"])) |
| bot.edit_message_reply_markup(chat_id=d["ch_id"], message_id=int(msg_id), reply_markup=markup) |
| return jsonify({"ok": True}) |
| except Exception as e: |
| return jsonify({"ok": False, "msg": str(e)}) |
|
|
| @app.route('/api/btn_multi', methods=['POST']) |
| @need_auth |
| def api_btn_multi(uid): |
| d = request.json |
| markup = types.InlineKeyboardMarkup(row_width=1) |
| for b in d["buttons"]: |
| markup.add(types.InlineKeyboardButton(text=b["text"], url=b["url"])) |
| try: |
| bot.send_message(d["ch_id"], d["text"], reply_markup=markup, parse_mode="HTML") |
| return jsonify({"ok": True}) |
| except Exception as e: |
| try: |
| bot.send_message(d["ch_id"], d["text"], reply_markup=markup) |
| return jsonify({"ok": True}) |
| except Exception as e2: |
| return jsonify({"ok": False, "msg": str(e2)}) |
|
|
| @app.route('/api/gen_dir', methods=['POST']) |
| @need_auth |
| def api_gen_dir(uid): |
| d = request.json |
| Thread(target=generate_smart_directory, args=(uid, d["ch_id"])).start() |
| return jsonify({"ok": True}) |
|
|
| @app.route('/api/replace_tag', methods=['POST']) |
| @need_auth |
| def api_replace_tag(uid): |
| d = request.json |
| new_tag = "" if d.get("new_tag") == "删除" else d.get("new_tag", "") |
| Thread(target=batch_replace_tags, args=(uid, d["ch_id"], d["old_tag"], new_tag)).start() |
| return jsonify({"ok": True}) |
|
|
| @app.route('/api/backup', methods=['POST']) |
| @need_auth |
| def api_backup(uid): |
| d = request.json |
| try: |
| latest_id = int(d["link"].strip().split('/')[-1]) |
| Thread(target=run_smart_backup_v2, args=(latest_id, uid, d["src"], d["tgt"])).start() |
| return jsonify({"ok": True}) |
| except Exception as e: |
| return jsonify({"ok": False, "msg": str(e)}) |
|
|
| |
| @app.route('/api/events') |
| def api_events(): |
| class FakeReq: |
| def __init__(self, init_data_str): |
| self.headers = {'X-Init-Data': init_data_str} |
|
|
| init_data_str = request.args.get('init_data', '') |
| uid = validate_webapp(FakeReq(init_data_str)) |
|
|
| if not uid: |
| return jsonify({"ok": False, "msg": "未授权"}), 401 |
|
|
| def stream(): |
| q = _event_queues[uid] |
| yield f"data: {json.dumps({'type': 'connected', 'data': '🟢 实时连接已建立'})}\n\n" |
| while True: |
| try: |
| event = q.get(timeout=30) |
| yield f"data: {json.dumps(event)}\n\n" |
| except queue.Empty: |
| yield f": heartbeat\n\n" |
|
|
| return Response( |
| stream(), |
| mimetype='text/event-stream', |
| headers={ |
| 'Cache-Control': 'no-cache', |
| 'X-Accel-Buffering': 'no', |
| 'Connection': 'keep-alive' |
| } |
| ) |
|
|
| @app.route('/api/health') |
| def api_health(): |
| return jsonify({ |
| "status": "ok", |
| "users": len(DATA.get("users", {})), |
| "msg_map": len(DATA.get("msg_map", {})), |
| "telethon": TL_CLIENT is not None |
| }) |
|
|
| |
| if __name__ == "__main__": |
| Thread(target=lambda: app.run(host="0.0.0.0", port=7860), daemon=True).start() |
| Thread(target=start_telethon_worker, daemon=True).start() |
|
|
| print("🔄 正在清除旧连接...") |
| for attempt in range(5): |
| try: |
| bot.remove_webhook() |
| bot.get_updates(offset=-1, timeout=1) |
| break |
| except Exception as e: |
| print(f"⏳ 等待旧实例释放... ({attempt+1}/5) {e}") |
| time.sleep(3) |
|
|
| print("🤖 Telebot 主消息引擎已启动!") |
| print("🌐 Mini App 地址: http://localhost:7860/webapp") |
|
|
| while True: |
| try: |
| bot.infinity_polling( |
| timeout=60, |
| long_polling_timeout=60, |
| allowed_updates=["message", "callback_query", |
| "channel_post", "edited_channel_post"] |
| ) |
| except Exception as e: |
| print(f"❌ Polling 异常: {e}") |
| print("⏳ 10秒后重连...") |
| time.sleep(10) |
| try: |
| bot.remove_webhook() |
| bot.get_updates(offset=-1, timeout=1) |
| except: |
| pass |
|
|