import asyncio import os import logging import json # 【新增部分】引入 json 库解析小程序数据 from fastapi import FastAPI, Request from aiogram import Bot, Dispatcher, types, F # 【新增部分】引入 F 用于魔法过滤 from aiogram.types import Update from aiogram.enums import ParseMode from aiogram.client.default import DefaultBotProperties # 配置日志 logging.basicConfig(level=logging.INFO) # 从环境变量获取配置 (在 HF Space 的 Settings -> Variables and secrets 里设置) BOT_TOKEN = os.getenv("BOT_TOKEN") WEBHOOK_URL = os.getenv("WEBHOOK_URL") # 你的 HF Space 网址 bot = Bot(token=BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) dp = Dispatcher() app = FastAPI() # ----------------- 模拟数据库 ----------------- # 警告:HF 重启会清空本地数据!后续请换成真实的数据库 CHANNEL_MAP = { -1001234567890: -1009876543210 # 来源频道 ID : 目标频道 ID } # ----------------- 媒体组缓冲池 ----------------- album_buffer = {} # ========================================== # 【新增部分】处理从小程序 (TWA) 传回来的数据 # ========================================== @dp.message(F.web_app_data) async def handle_web_app_data(message: types.Message): # 获取小程序发来的 JSON 字符串 data_str = message.web_app_data.data try: data = json.loads(data_str) if data.get("action") == "set_sync": source = data.get("source") target = data.get("target") logging.info(f"收到新配置: {source} -> {target}") # 动态更新内存中的字典 CHANNEL_MAP[source] = target await message.answer( f"✅ 配置保存成功!\n\n将来从 {source} 发布的消息会自动同步到 {target}。", parse_mode=ParseMode.HTML ) except Exception as e: logging.error(f"解析小程序数据失败: {e}") await message.answer("❌ 配置解析失败,请重试。") # ========================================== # ----------------- 处理频道消息同步 ----------------- @dp.channel_post() async def handle_channel_post(message: types.Message): source_id = message.chat.id # 检查是否在我们配置的来源频道里 if source_id not in CHANNEL_MAP: return target_id = CHANNEL_MAP[source_id] # 【核心逻辑 1:处理媒体组 / 相册】 if message.media_group_id: mg_id = message.media_group_id # 如果是这个相册的第一张图,创建列表并触发延迟任务 if mg_id not in album_buffer: album_buffer[mg_id] = [message.message_id] # 开启一个后台任务,等 2.5 秒后打包发送 asyncio.create_task(send_album_delayed(mg_id, source_id, target_id)) else: # 如果不是第一张图,直接把消息 ID 塞进列表 album_buffer[mg_id].append(message.message_id) # 【核心逻辑 2:处理单条消息(单图、单视频、纯文本)】 else: await bot.copy_message( chat_id=target_id, from_chat_id=source_id, message_id=message.message_id ) # 延迟发送相册的任务 async def send_album_delayed(mg_id: str, source_id: int, target_id: int): # 等待 2.5 秒,让 Telegram 把剩下的图片都推过来 await asyncio.sleep(2.5) # 从缓冲池里取出所有 message_id 并删除这个 key msg_ids = album_buffer.pop(mg_id, []) if msg_ids: try: # 注意这里是 copy_messages (复数),完美保留相册结构和遮罩! await bot.copy_messages( chat_id=target_id, from_chat_id=source_id, message_ids=msg_ids ) logging.info(f"成功同步媒体组: {mg_id}") except Exception as e: logging.error(f"同步媒体组失败: {e}") # ----------------- FastAPI 路由与 Webhook ----------------- @app.on_event("startup") async def on_startup(): # 启动时告诉 Telegram 把消息推送到我们的接口 webhook_path = f"{WEBHOOK_URL}/webhook" await bot.set_webhook(url=webhook_path) logging.info(f"Webhook 已设置为: {webhook_path}") @app.post("/webhook") async def telegram_webhook(request: Request): # 接收 Telegram 推送的更新并丢给 aiogram 处理 update_dict = await request.json() update = Update.model_validate(update_dict, context={"bot": bot}) await dp.feed_update(bot, update) return {"status": "ok"} @app.get("/ping") async def ping(): # 保活接口:给 UptimeRobot 用的 return {"status": "alive"}