import asyncio
import os
import logging
import json # 【新增部分】引入 json 库解析小程序数据
from fastapi import FastAPI, Request
from aiogram import Bot, Dispatcher, types, F # 【新增部分】引入 F 用于魔法过滤
from aiogram.types import Update
from aiogram.enums import ParseMode
from aiogram.client.default import DefaultBotProperties
# 配置日志
logging.basicConfig(level=logging.INFO)
# 从环境变量获取配置 (在 HF Space 的 Settings -> Variables and secrets 里设置)
BOT_TOKEN = os.getenv("BOT_TOKEN")
WEBHOOK_URL = os.getenv("WEBHOOK_URL") # 你的 HF Space 网址
bot = Bot(token=BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
dp = Dispatcher()
app = FastAPI()
# ----------------- 模拟数据库 -----------------
# 警告:HF 重启会清空本地数据!后续请换成真实的数据库
CHANNEL_MAP = {
-1001234567890: -1009876543210 # 来源频道 ID : 目标频道 ID
}
# ----------------- 媒体组缓冲池 -----------------
album_buffer = {}
# ==========================================
# 【新增部分】处理从小程序 (TWA) 传回来的数据
# ==========================================
@dp.message(F.web_app_data)
async def handle_web_app_data(message: types.Message):
# 获取小程序发来的 JSON 字符串
data_str = message.web_app_data.data
try:
data = json.loads(data_str)
if data.get("action") == "set_sync":
source = data.get("source")
target = data.get("target")
logging.info(f"收到新配置: {source} -> {target}")
# 动态更新内存中的字典
CHANNEL_MAP[source] = target
await message.answer(
f"✅ 配置保存成功!\n\n将来从 {source} 发布的消息会自动同步到 {target}。",
parse_mode=ParseMode.HTML
)
except Exception as e:
logging.error(f"解析小程序数据失败: {e}")
await message.answer("❌ 配置解析失败,请重试。")
# ==========================================
# ----------------- 处理频道消息同步 -----------------
@dp.channel_post()
async def handle_channel_post(message: types.Message):
source_id = message.chat.id
# 检查是否在我们配置的来源频道里
if source_id not in CHANNEL_MAP:
return
target_id = CHANNEL_MAP[source_id]
# 【核心逻辑 1:处理媒体组 / 相册】
if message.media_group_id:
mg_id = message.media_group_id
# 如果是这个相册的第一张图,创建列表并触发延迟任务
if mg_id not in album_buffer:
album_buffer[mg_id] = [message.message_id]
# 开启一个后台任务,等 2.5 秒后打包发送
asyncio.create_task(send_album_delayed(mg_id, source_id, target_id))
else:
# 如果不是第一张图,直接把消息 ID 塞进列表
album_buffer[mg_id].append(message.message_id)
# 【核心逻辑 2:处理单条消息(单图、单视频、纯文本)】
else:
await bot.copy_message(
chat_id=target_id,
from_chat_id=source_id,
message_id=message.message_id
)
# 延迟发送相册的任务
async def send_album_delayed(mg_id: str, source_id: int, target_id: int):
# 等待 2.5 秒,让 Telegram 把剩下的图片都推过来
await asyncio.sleep(2.5)
# 从缓冲池里取出所有 message_id 并删除这个 key
msg_ids = album_buffer.pop(mg_id, [])
if msg_ids:
try:
# 注意这里是 copy_messages (复数),完美保留相册结构和遮罩!
await bot.copy_messages(
chat_id=target_id,
from_chat_id=source_id,
message_ids=msg_ids
)
logging.info(f"成功同步媒体组: {mg_id}")
except Exception as e:
logging.error(f"同步媒体组失败: {e}")
# ----------------- FastAPI 路由与 Webhook -----------------
@app.on_event("startup")
async def on_startup():
# 启动时告诉 Telegram 把消息推送到我们的接口
webhook_path = f"{WEBHOOK_URL}/webhook"
await bot.set_webhook(url=webhook_path)
logging.info(f"Webhook 已设置为: {webhook_path}")
@app.post("/webhook")
async def telegram_webhook(request: Request):
# 接收 Telegram 推送的更新并丢给 aiogram 处理
update_dict = await request.json()
update = Update.model_validate(update_dict, context={"bot": bot})
await dp.feed_update(bot, update)
return {"status": "ok"}
@app.get("/ping")
async def ping():
# 保活接口:给 UptimeRobot 用的
return {"status": "alive"}