File size: 4,859 Bytes
5ae0519 f6bb964 5ae0519 f6bb964 5ae0519 f6bb964 5ae0519 f6bb964 5ae0519 f6bb964 5ae0519 f6bb964 5ae0519 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | import asyncio
import os
import logging
import json # 【新增部分】引入 json 库解析小程序数据
from fastapi import FastAPI, Request
from aiogram import Bot, Dispatcher, types, F # 【新增部分】引入 F 用于魔法过滤
from aiogram.types import Update
from aiogram.enums import ParseMode
from aiogram.client.default import DefaultBotProperties
# 配置日志
logging.basicConfig(level=logging.INFO)
# 从环境变量获取配置 (在 HF Space 的 Settings -> Variables and secrets 里设置)
BOT_TOKEN = os.getenv("BOT_TOKEN")
WEBHOOK_URL = os.getenv("WEBHOOK_URL") # 你的 HF Space 网址
bot = Bot(token=BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
dp = Dispatcher()
app = FastAPI()
# ----------------- 模拟数据库 -----------------
# 警告:HF 重启会清空本地数据!后续请换成真实的数据库
CHANNEL_MAP = {
-1001234567890: -1009876543210 # 来源频道 ID : 目标频道 ID
}
# ----------------- 媒体组缓冲池 -----------------
album_buffer = {}
# ==========================================
# 【新增部分】处理从小程序 (TWA) 传回来的数据
# ==========================================
@dp.message(F.web_app_data)
async def handle_web_app_data(message: types.Message):
# 获取小程序发来的 JSON 字符串
data_str = message.web_app_data.data
try:
data = json.loads(data_str)
if data.get("action") == "set_sync":
source = data.get("source")
target = data.get("target")
logging.info(f"收到新配置: {source} -> {target}")
# 动态更新内存中的字典
CHANNEL_MAP[source] = target
await message.answer(
f"✅ <b>配置保存成功!</b>\n\n将来从 <code>{source}</code> 发布的消息会自动同步到 <code>{target}</code>。",
parse_mode=ParseMode.HTML
)
except Exception as e:
logging.error(f"解析小程序数据失败: {e}")
await message.answer("❌ 配置解析失败,请重试。")
# ==========================================
# ----------------- 处理频道消息同步 -----------------
@dp.channel_post()
async def handle_channel_post(message: types.Message):
source_id = message.chat.id
# 检查是否在我们配置的来源频道里
if source_id not in CHANNEL_MAP:
return
target_id = CHANNEL_MAP[source_id]
# 【核心逻辑 1:处理媒体组 / 相册】
if message.media_group_id:
mg_id = message.media_group_id
# 如果是这个相册的第一张图,创建列表并触发延迟任务
if mg_id not in album_buffer:
album_buffer[mg_id] = [message.message_id]
# 开启一个后台任务,等 2.5 秒后打包发送
asyncio.create_task(send_album_delayed(mg_id, source_id, target_id))
else:
# 如果不是第一张图,直接把消息 ID 塞进列表
album_buffer[mg_id].append(message.message_id)
# 【核心逻辑 2:处理单条消息(单图、单视频、纯文本)】
else:
await bot.copy_message(
chat_id=target_id,
from_chat_id=source_id,
message_id=message.message_id
)
# 延迟发送相册的任务
async def send_album_delayed(mg_id: str, source_id: int, target_id: int):
# 等待 2.5 秒,让 Telegram 把剩下的图片都推过来
await asyncio.sleep(2.5)
# 从缓冲池里取出所有 message_id 并删除这个 key
msg_ids = album_buffer.pop(mg_id, [])
if msg_ids:
try:
# 注意这里是 copy_messages (复数),完美保留相册结构和遮罩!
await bot.copy_messages(
chat_id=target_id,
from_chat_id=source_id,
message_ids=msg_ids
)
logging.info(f"成功同步媒体组: {mg_id}")
except Exception as e:
logging.error(f"同步媒体组失败: {e}")
# ----------------- FastAPI 路由与 Webhook -----------------
@app.on_event("startup")
async def on_startup():
# 启动时告诉 Telegram 把消息推送到我们的接口
webhook_path = f"{WEBHOOK_URL}/webhook"
await bot.set_webhook(url=webhook_path)
logging.info(f"Webhook 已设置为: {webhook_path}")
@app.post("/webhook")
async def telegram_webhook(request: Request):
# 接收 Telegram 推送的更新并丢给 aiogram 处理
update_dict = await request.json()
update = Update.model_validate(update_dict, context={"bot": bot})
await dp.feed_update(bot, update)
return {"status": "ok"}
@app.get("/ping")
async def ping():
# 保活接口:给 UptimeRobot 用的
return {"status": "alive"}
|