#!/usr/bin/env python # -*- coding: utf-8 -*- """ 一个功能齐全的、基于OpenAI兼容API的Telegram机器人。 V4.9 更新: - (修复) 修复 `RuntimeError: Cannot reopen a client instance` (httpx): - 移除了 `setu_command` 和 `check_rss_feed` 中多余的 `async with` 块。 - `httpx` 客户端现在被正确地在 `__init__` 中初始化,并在 `post_shutdown_cleanup` 中关闭,不再在每次命令调用时被错误地关闭。 - (V4.8) 新增 /setu 命令 (lolicon API)。 - (V4.7) 24小时群组活跃度定时器 (/check_group_activity)。 - (V4.6 修复) 修复 `AttributeError: 'str' object has no attribute 'decode'` (移除 .decode())。 - (V4.6 更改) @ 提及逻辑: (随机回复) 不再 @;(触发词回复) 现在 @ 消息发送者。 - (V4.5) 增加全面的内存缓存层 (权限/触发词/冷却)。 - (V4.5) 动态 RSS 订阅 (/subrss, /unsubrss)。 - (V4.4) 修复 绝对黑名单 (ScopeFilter 应用于基础指令)。 - (V4.4) 重构 群组历史消息格式 (用户名字: 消息 / 机器人名字: [回复 @用户]: 消息)。 - (V4.3) 修复 Markdown 回退错误 (Message to be replied not found)。 - (V4.3) 重构权限为 "群组白名单/话题黑名单" 机制 (/addgroup, /blacklisttopic)。 - (V4.2) 修复 httpx.ConnectError (图片) (verify=False)。 - (V4.1) 修复了触发词匹配逻辑 (包含匹配)。 - (V4.1) 重构了群组上下文管理 (内存缓存)。 - (V4.1) 修复了 OpenAIClient 中 'list index out of range' 的错误。 - (V4.1) 历史记录清洁 (不保存错误)。 - (V4.1) 增加了 AI 预设 (System Prompt) 管理系统。 - (V4.1) 增加了 .txt 文件处理。 - (V4.1) 修复了 API 消息格式 (text, image_url, assistant)。 功能: - 使用 python-telegram-bot 框架。 - 对接 OpenAI 兼容的 API (例如 Groq, Mistral, 或自定义端点)。 - 使用 Upstash Redis 缓存对话上下文。 - 支持多模态 (图片) 消息,图片以 Base64 格式存入 Redis 并发送到 API。 - 动态模型切换 ( \switchmodel ),启动时自动缓存模型列表。 - 严格的范围控制:响应私聊 和 (已白名单群组的*未*黑名单话题)。 - 基于 Redis 的动态管理员和白/黑名单系统。 - 详细的日志记录和模块化类结构。 所需库: - python-telegram-bot - upstash-redis - httpx (替代 openai 库) - python-dotenv """ import os import logging import json import uuid import io import base64 import asyncio import re # <-- 新增: 用于触发词和预设解析 import random # <-- 新增: 用于随机回复 import time # <-- 新增: 用于内存冷却 from typing import List, Dict, Any, Optional, Set # 运行: pip install python-telegram-bot upstash-redis httpx python-dotenv import httpx # 使用 httpx 直接请求 API from dotenv import load_dotenv from upstash_redis import Redis # from openai import OpenAI, AsyncOpenAI # 不再使用 OpenAI SDK from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, BotCommand, Message from telegram.constants import ChatAction, ParseMode # <-- 新增 ParseMode from telegram.error import BadRequest # <-- 新增 BadRequest 用于 Markdown 回退 from telegram.ext import ( Application, ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, CallbackQueryHandler, filters, Job, # <-- 新增: 用于定时器 JobQueue # <-- 新增: 用于定时器 ) # --- 日志配置 --- # 配置日志记录 logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) # 将 httpx 的日志级别调高,因为它在 DEBUG 级别下过于嘈杂 logging.getLogger("httpx").setLevel(logging.WARNING) # (新增) 禁用 httpx 的 SSL 警告 import warnings import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) logging.getLogger("urllib3").setLevel(logging.WARNING) logger = logging.getLogger(__name__) # --- 1. 配置类 --- class Config: """ 管理所有从环境变量加载的配置。 """ def __init__(self): # 加载 .env 文件 (如果存在) load_dotenv() logger.info("正在加载环境变量...") # Telegram self.TELEGRAM_BOT_TOKEN: str = self.get_env_required("TELEGRAM_BOT_TOKEN") # OpenAI 兼容 API self.OPENAI_API_URL: str = self.get_env_required("OPENAI_COMPATIBLE_URL") self.OPENAI_API_KEY: str = self.get_env_required("OPENAI_COMPATIBLE_KEY") self.DEFAULT_MODEL: str = os.getenv("DEFAULT_MODEL", "gpt-3.5-turbo") # Upstash Redis (upstash-redis 库需要这两个) self.UPSTASH_REDIS_URL: str = self.get_env_required("UPSTASH_REDIS_REST_URL") self.UPSTASH_REDIS_TOKEN: str = self.get_env_required("UPSTASH_REDIS_REST_TOKEN") # 权限 self.ADMIN_USERS: Set[int] = self.parse_int_set_from_env("ADMIN_USERS") # (新增) RSS self.RSS_URL: str = os.getenv("RSS_URL", "https://ci-en.dlsite.com/creator/4551/article/xml/rss") # (V4.5) 移除 RSS_CHAT_IDS, 改为动态订阅 if not self.ADMIN_USERS: logger.warning("未在环境变量中定义 'ADMIN_USERS'。某些管理功能将受限。") logger.info("环境变量加载完毕。") def get_env_required(self, var_name: str) -> str: """获取必需的环境变量,如果缺失则抛出异常。""" value = os.getenv(var_name) if value is None: logger.error(f"严重错误: 环境变量 '{var_name}' 未设置。") raise ValueError(f"环境变量 '{var_name}' 必须被设置。") return value def parse_int_set_from_env(self, var_name: str) -> Set[int]: """从逗号分隔的环境变量字符串解析为整数集合。""" value_str = os.getenv(var_name) if not value_str: return set() try: return {int(x.strip()) for x in value_str.split(',') if x.strip()} except ValueError: logger.error(f"无法解析环境变量 '{var_name}'。请确保它是逗号分隔的整数ID。") return set() @property def OP_USER_ID(self) -> Optional[int]: """ 获取"超级管理员" (OP) 的ID,即 ADMIN_USERS 列表中的第一个。 用于接收启动通知。 """ if self.ADMIN_USERS: return next(iter(self.ADMIN_USERS)) return None # --- 2. Redis 管理类 --- class RedisManager: """ 封装所有与 Upstash Redis 的交互。 (V4.5: 主要负责*写入*和*初始加载*,读取操作由内存缓存处理) """ # Redis 键名常量 KEY_MODEL_LIST = "bot:model_list" KEY_ADMIN_USERS = "bot:admin_users" # V4.3 重构: KEY_WHITELISTED_GROUPS = "bot:whitelisted_groups" # (Set) 允许的群组 ID KEY_BLACKLISTED_TOPICS = "bot:blacklisted_topics" # (Set) 禁言的话题 Key (chat_id:thread_id) KEY_GROUP_TRIGGERS = "bot:group_triggers" # HASH, chat_id -> trigger_word # 预设键 KEY_PRESETS_PREFIX = "bot:presets:" # HASH, {user_id} -> {preset_name} -> json(messages) KEY_ACTIVE_PRESET_PREFIX = "bot:active_preset:" # STRING, {user_id} -> preset_name # RSS 键 KEY_LAST_RSS_LINK = "bot:last_rss_link" # STRING, 存储最新的 article_id KEY_RSS_SUBSCRIPTIONS = "bot:rss_subscriptions" # (V4.5 新增) Set, 存储订阅的 chat_id # V4.7 新增: 活跃度 KEY_LAST_RESPONSE_TIMES = "bot:last_response_times" # HASH, context_key -> timestamp # 会话超时时间 (例如: 24 小时) SESSION_EXPIRATION_SEC = 86400 # 24 * 60 * 60 # 上下文历史消息限制 CONTEXT_HISTORY_LIMIT = 200 def __init__(self, config: Config): try: self.redis = Redis( url=config.UPSTASH_REDIS_URL, token=config.UPSTASH_REDIS_TOKEN ) # 测试连接 self.redis.ping() logger.info("成功连接到 Upstash Redis。") except Exception as e: logger.error(f"无法连接到 Upstash Redis: {e}", exc_info=True) raise # --- 键生成辅助方法 --- def _get_context_key(self, user_id: int, chat_id: int, thread_id: Optional[int]) -> str: """ 重构: 获取上下文的唯一键。 - 私聊: context:user:{user_id} - 群组话题: context:group:{chat_id}:{thread_id_key} """ if chat_id > 0: # V4.3 修复: 私聊 (chat_id == user_id) return f"context:user:{user_id}" # V4.3 重构: 统一处理 "常规" 话题 (None -> 0) thread_id_key = thread_id if thread_id is not None else 0 return f"context:group:{chat_id}:{thread_id_key}" def _user_model_key(self, user_id: int) -> str: """获取跟踪用户当前模型的键 (保持不变)。""" return f"user:{user_id}:current_model" def _user_presets_key(self, user_id: int) -> str: """(新增) 获取用户预设 HASH 的键。""" return f"{self.KEY_PRESETS_PREFIX}{user_id}" def _user_active_preset_key(self, user_id: int) -> str: """(新增) 获取用户当前激活预设的键。""" return f"{self.KEY_ACTIVE_PRESET_PREFIX}{user_id}" # --- 启动初始化 --- def initialize_from_env(self, initial_admins: Set[int]): # V4.2: 移除了 initial_chats """ 在启动时,将环境变量中的初始管理员同步到 Redis。 (V4.2: 话题白名单现在必须通过命令添加) """ try: if initial_admins: # SADD 返回成功添加的新成员数量 added_admins = self.redis.sadd(self.KEY_ADMIN_USERS, *initial_admins) logger.info(f"已将 {added_admins} 个新管理员从环境变量同步到 Redis。") except Exception as e: logger.error(f"初始化 Redis 数据时出错: {e}", exc_info=True) # --- 会话管理 (私聊) --- def clear_session(self, user_id: int, chat_id: int, thread_id: Optional[int]): """重构: 清除当前上下文 (/new)。(私聊或群组均可调用)""" context_key = self._get_context_key(user_id, chat_id, thread_id) # 注意: 这只会清除 Redis。群组的内存缓存将在下一次消息时自动重置。 # (或者在 /new 命令中单独清除内存) self.redis.delete(context_key) logger.info(f"已为键 {context_key} 清除 Redis 会话。") def get_conversation_history(self, user_id: int, chat_id: int, thread_id: Optional[int]) -> List[Dict[str, Any]]: """ 重构: 根据上下文键获取对话历史。 (现在主要用于私聊,或群组的初始加载) """ context_key = self._get_context_key(user_id, chat_id, thread_id) raw_data = self.redis.get(context_key) if not raw_data: return [] try: history = json.loads(raw_data) if not isinstance(history, list): logger.warning(f"上下文 {context_key} 的数据格式不正确 (非列表),已重置。") return [] # 刷新 TTL self.redis.expire(context_key, self.SESSION_EXPIRATION_SEC) # 只返回最后 200 条 return history[-self.CONTEXT_HISTORY_LIMIT:] except json.JSONDecodeError: logger.error(f"无法解析上下文 {context_key} 的JSON数据,已重置。") return [] def add_to_conversation(self, user_id: int, chat_id: int, thread_id: Optional[int], role: str, content: Any): """ 重构: 向指定的上下文添加一条消息。 (现在主要用于私聊) (V4.1: content 已经是新格式的列表) """ context_key = self._get_context_key(user_id, chat_id, thread_id) try: raw_data = self.redis.get(context_key) if not raw_data: history = [] else: try: history = json.loads(raw_data) if not isinstance(history, list): history = [] except json.JSONDecodeError: history = [] # 添加新消息 (content 已经是新格式) history.append({"role": role, "content": content}) # 写入 (私聊立即写) self.redis.set(context_key, json.dumps(history), ex=self.SESSION_EXPIRATION_SEC) logger.debug(f"已将上下文 {context_key} 的 {role} 消息存入 Redis。") except Exception as e: logger.error(f"向上下文 {context_key} (私聊) 添加消息时出错: {e}", exc_info=True) # --- 模型管理 --- def get_current_model(self, user_id: int, default_model: str) -> str: """获取用户的首选模型,如果未设置则返回默认值。""" model = self.redis.get(self._user_model_key(user_id)) # V4.6 修复: 移除 .decode() return model if model else default_model def set_current_model(self, user_id: int, model_name: str): """设置用户的首选模型。""" self.redis.set(self._user_model_key(user_id), model_name) logger.info(f"用户 {user_id} 将模型切换为: {model_name}") def cache_model_list(self, models: List[str]): """将从API获取的模型列表缓存到 Redis。""" if not models: return # 缓存 1 小时 self.redis.set(self.KEY_MODEL_LIST, json.dumps(models), ex=3600) logger.info(f"已缓存 {len(models)} 个模型到 Redis。") def get_cached_model_list(self) -> Optional[List[str]]: """从 Redis 获取缓存的模型列表。""" raw_data = self.redis.get(self.KEY_MODEL_LIST) if not raw_data: return None try: return json.loads(raw_data) except json.JSONDecodeError: return None # --- 权限管理 (Admin) (V4.5: 读/写) --- def get_admin_users(self) -> Set[int]: """(V4.5) 从 Redis *读取*所有管理员ID (用于启动加载)。""" try: # V4.6 修复: 移除 .decode() return {int(uid) for uid in self.redis.smembers(self.KEY_ADMIN_USERS)} except Exception as e: logger.error(f"获取管理员列表时出错: {e}", exc_info=True) return set() def add_admin(self, user_id: int) -> bool: """(V4.5) *写入*管理员到 Redis。""" try: return self.redis.sadd(self.KEY_ADMIN_USERS, user_id) == 1 except Exception as e: logger.error(f"添加管理员 {user_id} 时出错: {e}") return False def remove_admin(self, user_id: int) -> bool: """(V4.5) 从 Redis *移除*管理员。""" try: return self.redis.srem(self.KEY_ADMIN_USERS, user_id) == 1 except Exception as e: logger.error(f"移除管理员 {user_id} 时出错: {e}") return False # --- 权限管理 (V4.5: Whitelist Group / Blacklist Topic) (读/写) --- # (V4.5 新增) 群组白名单 def get_whitelisted_groups(self) -> Set[int]: """(V4.5) 从 Redis *读取*所有白名单群组 (用于启动加载)。""" try: # V4.6 修复: 移除 .decode() return {int(cid) for cid in self.redis.smembers(self.KEY_WHITELISTED_GROUPS)} except Exception as e: logger.error(f"获取白名单群组时出错: {e}", exc_info=True) return set() def add_group_whitelist(self, chat_id: int) -> bool: """(V4.5) *写入*群组到白名单 Redis。""" try: return self.redis.sadd(self.KEY_WHITELISTED_GROUPS, chat_id) == 1 except Exception as e: logger.error(f"添加白名单群组 {chat_id} 时出错: {e}") return False # (V4.5 重构) 话题黑名单 def get_blacklisted_topics(self) -> Set[str]: """(V4.5) 从 Redis *读取*所有黑名单话题 (用于启动加载)。""" try: # V4.6 修复: 移除 .decode() return {k for k in self.redis.smembers(self.KEY_BLACKLISTED_TOPICS)} except Exception as e: logger.error(f"获取黑名单话题时出错: {e}", exc_info=True) return set() def add_blacklisted_topic(self, topic_key: str) -> bool: """(V4.5) *写入*话题到黑名单 Redis。""" try: return self.redis.sadd(self.KEY_BLACKLISTED_TOPICS, topic_key) == 1 except Exception as e: logger.error(f"添加黑名单话题 {topic_key} 时出错: {e}") return False # --- 群组触发词管理 (V4.5: 读/写) --- def get_all_group_triggers(self) -> Dict[int, str]: """(V4.5) 从 Redis *读取*所有群组触发词 (用于启动加载)。""" try: # V4.6 修复: 移除 .decode() return {int(k): v for k, v in self.redis.hgetall(self.KEY_GROUP_TRIGGERS).items()} except Exception as e: logger.error(f"获取所有群组触发词时出错: {e}", exc_info=True) return {} def set_group_trigger(self, chat_id: int, word: str): """(V4.5) *写入*群组触发词到 Redis。""" self.redis.hset(self.KEY_GROUP_TRIGGERS, str(chat_id), word) # --- 预设 (Preset) 管理 --- def set_preset(self, user_id: int, name: str, messages: List[Dict[str, Any]]) -> bool: """ 保存一个预设 (HSET)。 (V4.1: messages 已经是新格式) """ try: key = self._user_presets_key(user_id) self.redis.hset(key, name, json.dumps(messages)) return True except Exception as e: logger.error(f"为用户 {user_id} 保存预设 '{name}' 时出错: {e}") return False def get_preset(self, user_id: int, name: str) -> Optional[List[Dict[str, Any]]]: """获取一个特定的预设 (HGET)。""" try: key = self._user_presets_key(user_id) raw_data = self.redis.hget(key, name) if not raw_data: return None return json.loads(raw_data) except Exception as e: logger.error(f"为用户 {user_id} 获取预设 '{name}' 时出错: {e}") return None def delete_preset(self, user_id: int, name: str) -> bool: """删除一个预设 (HDEL)。""" try: key = self._user_presets_key(user_id) # 检查是否删除了当前激活的预设 active_preset = self.get_active_preset_name(user_id) if active_preset == name: self.set_active_preset(user_id, "") # 清空激活的预设 return self.redis.hdel(key, name) > 0 except Exception as e: logger.error(f"为用户 {user_id} 删除预设 '{name}' 时出错: {e}") return False def list_presets(self, user_id: int) -> List[str]: """列出用户的所有预设名称 (HKEYS)。""" try: key = self._user_presets_key(user_id) # (V4.6) 修复 bytes decode return self.redis.hkeys(key) except Exception as e: logger.error(f"为用户 {user_id} 列出预设时出错: {e}") return [] def set_active_preset(self, user_id: int, name: str): """设置当前激活的预设 (SET)。""" key = self._user_active_preset_key(user_id) if name: self.redis.set(key, name) else: # 如果 name 为空,则删除键 self.redis.delete(key) def get_active_preset_name(self, user_id: int) -> Optional[str]: """获取当前激活的预设名称 (GET)。""" key = self._user_active_preset_key(user_id) # (V4.6) 修复 bytes decode return self.redis.get(key) def get_active_preset_messages(self, user_id: int) -> List[Dict[str, Any]]: """(核心) 获取当前激活的预设的消息列表。""" name = self.get_active_preset_name(user_id) if not name: return [] messages = self.get_preset(user_id, name) return messages if messages else [] # --- (新增) RSS 管理 --- def get_last_rss_link(self) -> Optional[str]: """获取最后已知的 RSS 链接 ID。""" # (V4.6) 修复 bytes decode return self.redis.get(self.KEY_LAST_RSS_LINK) def set_last_rss_link(self, link_id: str): """设置最新的 RSS 链接 ID。""" self.redis.set(self.KEY_LAST_RSS_LINK, link_id) def get_rss_subscribers(self) -> Set[int]: """(V4.5) 从 Redis *读取*所有 RSS 订阅者。""" try: # (V4.6) 修复 bytes decode return {int(cid) for cid in self.redis.smembers(self.KEY_RSS_SUBSCRIPTIONS)} except Exception as e: logger.error(f"获取 RSS 订阅者时出错: {e}") return set() def add_rss_subscriber(self, chat_id: int) -> bool: """(V4.5) *写入* RSS 订阅者。""" try: return self.redis.sadd(self.KEY_RSS_SUBSCRIPTIONS, chat_id) == 1 except Exception as e: logger.error(f"添加 RSS 订阅者 {chat_id} 时出错: {e}") return False def remove_rss_subscriber(self, chat_id: int) -> bool: """(V4.5) *移除* RSS 订阅者。""" try: return self.redis.srem(self.KEY_RSS_SUBSCRIPTIONS, chat_id) == 1 except Exception as e: logger.error(f"移除 RSS 订阅者 {chat_id} 时出错: {e}") return False # --- (V4.7 新增) 活跃度管理 --- def update_last_response_time(self, context_key: str): """(V4.7) 更新一个话题的最后响应时间戳。""" try: self.redis.hset(self.KEY_LAST_RESPONSE_TIMES, context_key, time.time()) except Exception as e: logger.error(f"更新最后响应时间失败 {context_key}: {e}") def get_all_last_response_times(self) -> Dict[str, float]: """(V4.7) 获取所有话题的最后响应时间戳 (用于启动检查)。""" try: # V4.6 修复: 移除 .decode() return {k: float(v) for k, v in self.redis.hgetall(self.KEY_LAST_RESPONSE_TIMES).items()} except Exception as e: logger.error(f"获取所有最后响应时间失败: {e}", exc_info=True) return {} # --- 3. OpenAI API 客户端类 (使用 httpx) --- class OpenAIClient: """ 封装所有与 OpenAI 兼容 API 的交互。 使用 httpx.AsyncClient 进行异步 API 请求。 """ def __init__(self, config: Config): self.base_url = config.OPENAI_API_URL self.headers = { "Authorization": f"Bearer {config.OPENAI_API_KEY}", "Content-Type": "application/json" } # 修复 V4.2: 禁用 SSL 验证 (verify=False) 以修复 ConnectError logger.warning("!!! 安全警告: 正在为 OpenAIClient 禁用 SSL 验证 (verify=False)。") logger.warning("!!! 这可以修复 ConnectError,但会带来安全风险。请确保 API 端点可信。") self.client = httpx.AsyncClient( base_url=self.base_url, headers=self.headers, timeout=30.0, # 设置 30 秒超时 follow_redirects=True, verify=False # <-- 修复: 禁用 SSL 验证 ) self.default_model = config.DEFAULT_MODEL logger.info(f"OpenAI 客户端 (httpx) 已初始化,指向: {config.OPENAI_API_URL}") async def get_models(self) -> List[str]: """ 从 API 获取可用模型列表。 """ try: logger.info("正在从 API (httpx) 获取模型列表...") # 兼容的 API 端点通常是 /v1/models response = await self.client.get("/v1/models") response.raise_for_status() # 如果状态码不是 2xx,则抛出异常 models_data = response.json() # 过滤并返回所有模型ID (接受所有模型) model_ids = [model['id'] for model in models_data.get('data', []) if model.get('id')] logger.info(f"成功获取到 {len(model_ids)} 个模型。") return sorted(model_ids) except httpx.HTTPStatusError as e: logger.error(f"API 请求失败 (状态码 {e.response.status_code}): {e.response.text}", exc_info=True) return [] except Exception as e: logger.error(f"从 API 获取模型列表失败 (httpx): {e}", exc_info=True) return [] async def generate_response(self, model: str, history: List[Dict[str, Any]]) -> Optional[str]: """ 调用聊天补全 (Chat Completions) API。 'history' 必须是符合 API 格式的列表。 修复: 增加了对空 'choices' 列表的检查,防止 'list index out of range'。 """ logger.debug(f"向模型 {model} (httpx) 发送请求,包含 {len(history)} 条历史消息。") if history and "image_url" in str(history[-1]): logger.debug("请求中包含图片。") # 兼容的 API 端点通常是 /v1/chat/completions endpoint = "/v1/chat/completions" payload = { "model": model, "messages": history, "stream": False # 不使用流式响应,简化处理 } try: response = await self.client.post(endpoint, json=payload) response.raise_for_status() # 检查 HTTP 错误 data = response.json() # 修复: 检查 'choices' 是否存在且不为空 choices = data.get('choices', []) if not choices or 'message' not in choices[0]: logger.warning(f"API 响应中未找到 'choices' 或 'message'。响应: {data}") return "抱歉,AI 响应为空或格式不正确。" # 修复 V4.1: AI 的回复也在 content 列表中 # (但大多数兼容 API 仍然返回 {role:"...", content:"..."}) # 我们必须检查两种情况 message = choices[0].get('message', {}) response_content = message.get('content') if response_content is None: logger.warning(f"API 响应中 'content' 为空。响应: {data}") return "抱歉,AI 响应格式不正确 (content 为 null)。" # 检查 content 是字符串 (标准) 还是列表 (新格式) if isinstance(response_content, str): response_text = response_content elif isinstance(response_content, list) and len(response_content) > 0 and response_content[0].get('type') == 'text': response_text = response_content[0].get('text', '') else: logger.warning(f"API 响应的 'content' 格式未知: {response_content}") return "抱歉,AI 响应格式未知。" logger.debug(f"模型 {model} 成功返回响应。") return response_text except httpx.ConnectError as e: # 捕获特定的 ConnectError logger.error(f"调用 OpenAI API ({model}) 时(httpx)遇到连接错误 (ConnectError): {e}", exc_info=True) return f"抱歉,AI 连接失败 (ConnectError): {str(e)}" except httpx.HTTPStatusError as e: error_message = f"API 返回错误 (状态码 {e.response.status_code})。详情: {e.response.text}" logger.error(f"调用 OpenAI API ({model}) 时出错: {error_message}", exc_info=True) return f"抱歉,调用 AI 模型时出错: {error_message}" except Exception as e: logger.error(f"调用 OpenAI API ({model}) 时(httpx)遇到意外错误: {e}", exc_info=True) # 向用户返回一个更友好的错误信息 return f"抱歉,调用 AI 模型时出错: {str(e)}" async def close(self): """关闭 httpx 客户端。""" await self.client.aclose() # --- 4. 自定义过滤器 --- class AdminFilter(filters.BaseFilter): """ (V4.5) 自定义 PTB 过滤器,用于检查用户是否为管理员 (从内存缓存读取)。 """ def __init__(self, bot: 'TelegramBot'): # V4.5: 传入 bot 实例 self.bot = bot super().__init__(name="AdminFilter") def filter(self, message: Message) -> bool: # V4.5: 从内存缓存读取 return message.from_user.id in self.bot.admin_users class ScopeFilter(filters.BaseFilter): """ (V4.5) 自定义 PTB 过滤器,用于控制机器人的响应范围 (从内存缓存读取)。 允许的条件 (OR): 1. 私聊 (private) 2. (群组在白名单中 AND 话题*未*在黑名单中) """ def __init__(self, bot: 'TelegramBot'): # V4.5: 传入 bot 实例 self.bot = bot super().__init__(name="ScopeFilter") def filter(self, message: Message) -> bool: # 1. 允许私聊 if message.chat.type == "private": return True # 2. 检查群组 if message.chat.type in ("group", "supergroup"): # 2.1 检查群组是否在白名单中 (V4.5: 从内存读取) if not message.chat.id in self.bot.whitelisted_groups: logger.debug(f"忽略来自 {message.chat.id} 的消息 (群组未在白名单)。") return False # 2.2 检查话题是否在黑名单中 (V4.5: 从内存读取) thread_id = message.message_thread_id thread_id_key = thread_id if thread_id is not None else 0 topic_key = f"{message.chat.id}:{thread_id_key}" if topic_key in self.bot.blacklisted_topics: logger.debug(f"忽略来自 {topic_key} 的消息 (话题在黑名单)。") return False # 群组在白名单中,且话题不在黑名单中 return True # 3. 忽略其他所有情况 (如频道) return False # --- 5. Telegram 机器人主类 --- class TelegramBot: """ 组织所有 Telegram 机器人逻辑、命令和消息处理器。 """ # 群组回复配置 DEFAULT_GROUP_REPLY_CHANCE = 0.15 # 15% 随机回复概率 TRIGGER_COOLDOWN_SEC = 30 # 30 秒触发词冷却 # 预设解析正则表达式 PRESET_REGEX = re.compile( r"^(name|user|system|assistant)[\s::]+(.+)", re.IGNORECASE | re.MULTILINE ) def __init__(self, config: Config, redis: RedisManager, openai: OpenAIClient): self.config = config self.redis = redis self.openai = openai # (V4.5) 内存缓存 self.group_context_cache: Dict[str, List[Dict[str, Any]]] = {} self.cache_lock = asyncio.Lock() # 用于 group_context_cache self.permission_lock = asyncio.Lock() # (V4.5) 用于权限/触发词缓存 self.save_job: Optional[Job] = None self.activity_check_job: Optional[Job] = None # V4.7 新增 # (V4.5) 权限和触发词的内存缓存 self.admin_users: Set[int] = set() self.whitelisted_groups: Set[int] = set() self.blacklisted_topics: Set[str] = set() self.group_triggers: Dict[int, str] = {} self.trigger_cooldowns: Dict[str, float] = {} # Key: "chat_id:word", Val: expiry_timestamp # (新增) RSS 客户端 self.rss_client = httpx.AsyncClient(timeout=10.0, verify=False) # 同样禁用 SSL # (V4.8 新增) Setu 客户端 self.setu_client = httpx.AsyncClient(timeout=20.0, verify=False, follow_redirects=True) # (新增 V4.4) 存储机器人名字 self.bot_name: Optional[str] = "Bot" # 默认值 self.application = ApplicationBuilder() \ .token(config.TELEGRAM_BOT_TOKEN) \ .post_init(self.post_init_setup) \ .post_shutdown(self.post_shutdown_cleanup) \ .job_queue(JobQueue()) \ .build() # 实例化自定义过滤器 (V4.5: 传入 self) self.admin_filter = AdminFilter(self) self.scope_filter = ScopeFilter(self) # 存储模型列表,避免频繁查询 Redis self._model_list_cache: List[str] = [] def setup_handlers(self): """注册所有的命令和消息处理器。""" logger.info("正在注册处理器...") # --- 核心命令 --- start_handler = CommandHandler("start", self.start_command, filters=filters.ChatType.PRIVATE) # (V4.4) 为基础指令添加 scope_filter 以实现绝对黑名单 help_handler = CommandHandler("help", self.help_command, filters=self.scope_filter) new_handler = CommandHandler("new", self.new_command, filters=self.scope_filter) switch_model_handler = CommandHandler("switchmodel", self.switch_model_command, filters=self.scope_filter) # (V4.8 新增) Setu 命令 setu_handler = CommandHandler("setu", self.setu_command, filters=self.scope_filter) # --- 管理员命令 (使用 admin_filter) --- add_admin_handler = CommandHandler("addadmin", self.add_admin_command, filters=self.admin_filter) del_admin_handler = CommandHandler("deladmin", self.del_admin_command, filters=self.admin_filter) save_history_handler = CommandHandler("savehistory", self.save_history_command, filters=self.admin_filter) # V4.3 重构: add_group_handler = CommandHandler( "addgroup", self.add_group_whitelist_command, filters=self.admin_filter & filters.ChatType.GROUPS ) blacklist_topic_handler = CommandHandler( "blacklisttopic", self.blacklist_topic_command, filters=self.admin_filter & filters.ChatType.GROUPS ) group_admin_filters = self.admin_filter & filters.ChatType.GROUPS set_trigger_handler = CommandHandler( "settrigger", self.set_trigger_command, filters=group_admin_filters ) # --- 预设 (Preset) 命令 --- set_preset_handler = CommandHandler( "setpreset", self.set_preset_command, # 允许私聊, 允许文本(用于粘贴)和 .txt 文档 filters=filters.ChatType.PRIVATE & (filters.TEXT | filters.Document.TXT) ) list_presets_handler = CommandHandler("listpresets", self.list_presets_command, filters=filters.ChatType.PRIVATE) switch_preset_handler = CommandHandler("switchpreset", self.switch_preset_command, filters=filters.ChatType.PRIVATE) del_preset_handler = CommandHandler("delpreset", self.del_preset_command, filters=filters.ChatType.PRIVATE) # --- (V4.5 新增) RSS 命令 --- # (V4.5) scope_filter 确保 /subrss 不能在黑名单话题中运行 sub_rss_handler = CommandHandler("subrss", self.sub_rss_command, filters=self.scope_filter) unsub_rss_handler = CommandHandler("unsubrss", self.unsub_rss_command, filters=self.scope_filter) # --- 回调处理器 (用于模型切换) --- model_callback_handler = CallbackQueryHandler(self.select_model_callback, pattern="^model_select_") # --- 核心消息处理器 (使用 scope_filter) --- message_handler = MessageHandler( (filters.TEXT | filters.PHOTO | filters.Document.TXT) & self.scope_filter & (~filters.COMMAND), self.handle_message ) # 注册所有处理器 handlers = [ start_handler, help_handler, new_handler, switch_model_handler, setu_handler, # V4.8 新增 add_admin_handler, del_admin_handler, add_group_handler, blacklist_topic_handler, # V4.3 更改 set_trigger_handler, save_history_handler, set_preset_handler, list_presets_handler, switch_preset_handler, del_preset_handler, sub_rss_handler, unsub_rss_handler, # V4.5 新增 model_callback_handler, message_handler ] self.application.add_handlers(handlers) logger.info("处理器注册完毕。") async def post_init_setup(self, application: Application): """ 在机器人启动时(run_polling被调用后)执行的异步初始化任务。 (V4.7 更新) """ logger.info("正在执行机器人启动后数据初始化 (post_init)...") # (V4.4 新增) 获取机器人名字 try: bot_user = await application.bot.get_me() self.bot_name = bot_user.name logger.info(f"成功获取机器人名字: {self.bot_name}") except Exception as e: logger.error(f"无法获取机器人名字: {e}", exc_info=True) self.bot_name = "Assistant" # 设置回退 # (V4.5) 1. 从 Redis 加载权限和配置到内存 self.admin_users = self.redis.get_admin_users() self.whitelisted_groups = self.redis.get_whitelisted_groups() self.blacklisted_topics = self.redis.get_blacklisted_topics() self.group_triggers = self.redis.get_all_group_triggers() logger.info(f"成功加载 {len(self.admin_users)} 个管理员, {len(self.whitelisted_groups)} 个白名单群组, {len(self.blacklisted_topics)} 个黑名单话题, {len(self.group_triggers)} 个触发词到内存。") # 2. 同步环境变量 (Admin) 到 Redis (如果 Redis 为空) self.redis.initialize_from_env(self.config.ADMIN_USERS) # 3. 获取和缓存模型 models = await self.openai.get_models() if models: self._model_list_cache = models self.redis.cache_model_list(models) else: logger.warning("无法从API获取模型列表,将尝试使用 Redis 缓存 (如果存在)。") cached_models = self.redis.get_cached_model_list() if cached_models: self._model_list_cache = cached_models logger.info("已从 Redis 缓存加载模型列表。") else: logger.error("严重: API 和 Redis 缓存均无模型列表。 \switchmodel 将无法工作。") # 4. 从 Redis 加载群组历史到内存 await self.load_group_history_from_redis() # 5. 启动定时任务 (群组保存 + RSS + 活跃度) if not self.application.job_queue: logger.error("JobQueue 未初始化! 无法启动定时任务。") else: # 群组保存 self.save_job = self.application.job_queue.run_repeating( self.save_all_group_history_to_redis, interval=3600, # 1 hour first=3600, # 1 小时后开始 name="hourly_save" ) logger.info("已启动每小时群组历史缓存定时器。") # (新增) RSS 检查 if self.config.RSS_URL: # V4.5: 只要 URL 存在就启动 self.application.job_queue.run_repeating( self.check_rss_feed, interval=600, # 10 minutes first=10, # 10 秒后开始 name="rss_check" ) logger.info(f"已启动每 10 分钟 RSS 检查任务 (URL: {self.config.RSS_URL})。") else: logger.info("未配置 RSS_URL,跳过 RSS 任务。") # (V4.7 新增) 启动群组活跃度检查 self.activity_check_job = self.application.job_queue.run_repeating( self.check_group_activity, interval=3600, # 每小时检查一次 first=60, # 启动 1 分钟后 name="group_activity_check" ) logger.info("已启动每小时群组活跃度检查定时器。") # 6. 注册 Telegram 命令 commands = [ BotCommand("help", "查看所有指令和帮助"), BotCommand("new", "开始一个新的对话会话"), BotCommand("switchmodel", "切换聊天模型"), BotCommand("setu", "(V4.8) 随机获取一张图片 (可加 tag)"), # <-- 新增 BotCommand("subrss", "(V4.5) 订阅 RSS 更新到此聊天"), BotCommand("unsubrss", "(V4.5) 取消此聊天的 RSS 订阅"), BotCommand("setpreset", "(私聊) 设置AI预设 (通过文本或文件)"), BotCommand("listpresets", "(私聊) 查看我的AI预设"), BotCommand("switchpreset", "(私聊) 切换AI预设"), BotCommand("delpreset", "(私聊) 删除AI预设"), BotCommand("savehistory", "(管理员) 手动保存群组缓存到Redis"), BotCommand("addgroup", "(管理员/群组中) 将此群组添加至白名单"), # V4.3 新增 BotCommand("blacklisttopic", "(管理员/话题中) 将此话题添加至黑名单"), # V4.3 新增 BotCommand("settrigger", "(管理员/群组中) 设置本群的AI回复触发词"), BotCommand("addadmin", "(管理员) 添加管理员 (需回复或提供ID)"), BotCommand("deladmin", "(管理员) 移除管理员 (需回复或提供ID)"), ] await application.bot.set_my_commands(commands) logger.info("Telegram Bot 命令已注册。") # 7. 发送启动通知 op_id = self.config.OP_USER_ID if not op_id: logger.warning("未配置 OP_USER_ID,跳过启动通知。") return try: help_text = self._get_help_text() startup_msg = f"✅ **机器人 {self.bot_name} 已成功启动!**\n\n{help_text}" await application.bot.send_message( chat_id=op_id, text=startup_msg, parse_mode="Markdown" ) logger.info(f"已成功向 OP ({op_id}) 发送启动通知。") except Exception as e: logger.error(f"向 OP ({op_id}) 发送启动消息失败: {e}", exc_info=True) async def post_shutdown_cleanup(self, application: Application): """ 在机器人关闭时(shutdown被调用后)执行的异步清理任务。 (V4.8 更新) """ logger.info("正在执行异步清理 (post_shutdown)...") # 关机前最后保存一次 if self.group_context_cache: logger.info("正在执行关机前最后一次群组历史保存...") # 创建一个临时的 context 对象 fake_context = ContextTypes.DEFAULT_TYPE(application=self.application, chat_id=None, user_id=None) await self.save_all_group_history_to_redis(fake_context) logger.info("关机前保存完毕。") if self.openai: logger.info("正在关闭 OpenAI (httpx) 客户端...") await self.openai.close() logger.info("OpenAI (httpx) 客户端已关闭。") if self.rss_client: logger.info("正在关闭 RSS (httpx) 客户端...") await self.rss_client.aclose() logger.info("RSS (httpx) 客户端已关闭。") # (V4.8 新增) if self.setu_client: logger.info("正在关闭 Setu (httpx) 客户端...") await self.setu_client.aclose() logger.info("Setu (httpx) 客户端已关闭。") # --- 命令处理器实现 --- def _get_help_text(self) -> str: """生成帮助文本。""" return """ 欢迎使用!我是您的 AI 助手。 **基础指令:** /help - 显示此帮助信息 /new - 忘记上下文,开始新对话 /switchmodel - 选择要对话的 AI 模型 /setu [tag] [tag...] - (V4.8) 随机获取一张图片 (可加 tag) /subrss - 订阅 RSS 更新到此聊天 /unsubrss - 取消此聊天的 RSS 订阅 **AI 预设 (System Prompt) [仅限私聊]:** /setpreset - (回复此消息或上传 .txt) 设置预设。格式: name: 预设名称 system: 你是一个... user: 示例输入 assistant: 示例回复 /listpresets - 查看我所有的预设 /switchpreset [名称] - 切换预设 /delpreset [名称] - 删除预设 **管理员指令:** /addgroup - (在群组中) 将此群组加入白名单 /blacklisttopic - (在*话题中*) 将此话题加入黑名单 /settrigger [词] - (在群组中) 设置一个词,包含它将必定触发回复 /savehistory - (任意位置) 手动保存群组缓存到Redis /addadmin [user_id] - 添加管理员 /deladmin [user_id] - 移除管理员 **使用方法:** 1. (群组) 管理员需要先使用 /addgroup 将群组加入白名单。 2. (群组) 我会回复所有话题,除非你使用 /blacklisttopic 禁言特定话题 (包括 "常规" 话题)。 3. (群组) 默认我只会随机回复 (15%)。如果设置了 /settrigger,包含触发词的消息我会 @ 您并 100% 回复。 4. (私聊) 您可以直接向我发送消息、图片或 .txt 文件。 """ async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """私聊 \start 命令处理器。""" await update.message.reply_text(self._get_help_text(), parse_mode=ParseMode.MARKDOWN) async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """\help 命令处理器。""" await update.message.reply_text(self._get_help_text(), parse_mode=ParseMode.MARKDOWN) async def new_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """ \new 命令处理器。 重构: 清除当前上下文。 (新增) 同时清除内存缓存。 """ user_id = update.effective_user.id chat_id = update.effective_chat.id chat_type = update.effective_chat.type thread_id = update.message.message_thread_id if update.message else None # V4.3 修复: 确保 thread_id 键一致 thread_id_key = thread_id if thread_id is not None else 0 # 1. 清除 Redis self.redis.clear_session(user_id, chat_id, thread_id) # 2. (新增) 如果是群组,也清除内存缓存 if chat_type in ("group", "supergroup"): # V4.3 修复: 使用正确的 context_key context_key = self.redis._get_context_key(user_id, chat_id, thread_id) async with self.cache_lock: if context_key in self.group_context_cache: del self.group_context_cache[context_key] logger.info(f"已为键 {context_key} 清除内存缓存。") logger.info(f"用户 {user_id} 在 {chat_id}:{thread_id_key} 使用了 /new") await update.message.reply_text("💡 好的,我们来开始一个全新的对话吧!(当前上下文已清除)") async def switch_model_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """\switchmodel 命令处理器。显示模型选择键盘。""" models = self._model_list_cache if not models: models = self.redis.get_cached_model_list() or [] if not models: await update.message.reply_text("抱歉,暂时无法获取模型列表。请稍后再试。") return current_model = self.redis.get_current_model( update.effective_user.id, self.config.DEFAULT_MODEL ) keyboard: List[List[InlineKeyboardButton]] = [] # 每行最多放 2 个模型 row = [] for i, model_id in enumerate(models): # 标记当前选中的模型 button_text = f"✅ {model_id}" if model_id == current_model else model_id # 按钮文本 (button_text) 也可能超长 (64 字节限制) # 我们需要按字节截断 max_bytes = 60 # 留 4 字节给 "..." button_text_bytes = button_text.encode('utf-8') if len(button_text_bytes) > max_bytes: # 从末尾开始解码,直到找到一个有效的 UTF-8 截断点 while max_bytes > 0: try: button_text = button_text_bytes[:max_bytes].decode('utf-8') + "..." break except UnicodeDecodeError: max_bytes -= 1 else: # 极端情况,无法截断 button_text = "Model..." row.append( InlineKeyboardButton( button_text, # 使用索引 (i) 作为 callback_data,避免超长 callback_data=f"model_select_{i}" ) ) if len(row) == 2: keyboard.append(row) row = [] if row: # 添加最后一行 (如果不满) keyboard.append(row) reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(f"请选择一个模型 (当前: {current_model}):", reply_markup=reply_markup) # --- 回调处理器 --- async def select_model_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """处理模型选择键盘的回调。""" query = update.callback_query await query.answer() # 立即响应回调,消除加载状态 user_id = query.from_user.id model_name = "" # 预定义 try: model_index_str = query.data.split("model_select_", 1)[1] model_index = int(model_index_str) models = self._model_list_cache if not models: models = self.redis.get_cached_model_list() or [] if not models: await query.edit_message_text("错误: 模型列表缓存已过期,请重试 /switchmodel。") return if 0 <= model_index < len(models): model_name = models[model_index] else: raise ValueError("模型索引越界") except (IndexError, TypeError, ValueError) as e: logger.warning(f"解析模型回调时出错: {e}. Data: {query.data}") await query.edit_message_text("选择出错,请重试。") return if not model_name: await query.edit_message_text("无法确定所选模型,请重试。") return self.redis.set_current_model(user_id, model_name) logger.info(f"用户 {user_id} 通过回调将模型切换为 {model_name}") # 更新原始消息,移除键盘 await query.edit_message_text(f"✅ 模型已切换为: {model_name}") # --- 管理员命令实现 --- async def _get_id_from_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> Optional[int]: """从命令参数或回复的消息中提取 User ID。""" # 1. 检查回复 if update.message.reply_to_message: return update.message.reply_to_message.from_user.id # 2. 检查参数 if context.args: try: return int(context.args[0]) except (ValueError, IndexError): return None return None async def add_admin_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """\addadmin 命令处理器。""" user_id_to_add = await self._get_id_from_command(update, context) if not user_id_to_add: await update.message.reply_text("请回复一个用户或提供 User ID。用法: /addadmin [user_id]") return if user_id_to_add in self.admin_users: # V4.5: 读内存 await update.message.reply_text(f"用户 {user_id_to_add} 已经是管理员了。") return if self.redis.add_admin(user_id_to_add): # 写 Redis async with self.permission_lock: # 写内存 self.admin_users.add(user_id_to_add) logger.info(f"管理员 {update.effective_user.id} 添加了新管理员 {user_id_to_add}") await update.message.reply_text(f"✅ 成功添加管理员: {user_id_to_add}") else: await update.message.reply_text("添加失败,请查看日志。") async def del_admin_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """\deladmin 命令处理器。""" user_id_to_remove = await self._get_id_from_command(update, context) if not user_id_to_remove: await update.message.reply_text("请回复一个用户或提供 User ID。用法: /deladmin [user_id]") return # 阻止 OP (第一个管理员) 被移除 if user_id_to_remove == self.config.OP_USER_ID: await update.message.reply_text("无法移除超级管理员 (OP)。") return if user_id_to_remove not in self.admin_users: # V4.5: 读内存 await update.message.reply_text(f"用户 {user_id_to_remove} 不是管理员。") return if self.redis.remove_admin(user_id_to_remove): # 写 Redis async with self.permission_lock: # 写内存 self.admin_users.discard(user_id_to_remove) logger.info(f"管理员 {update.effective_user.id} 移除了管理员 {user_id_to_remove}") await update.message.reply_text(f"✅ 成功移除管理员: {user_id_to_remove}") else: await update.message.reply_text("移除失败,请查看日志。") async def add_group_whitelist_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """(V4.3 新增) /addgroup 命令处理器。""" chat_id = update.message.chat.id chat_title = update.message.chat.title if chat_id in self.whitelisted_groups: # V4.5: 读内存 await update.message.reply_text(f"群组 '{chat_title}' (ID: {chat_id}) 已经在白名单中了。") return if self.redis.add_group_whitelist(chat_id): # 写 Redis async with self.permission_lock: # 写内存 self.whitelisted_groups.add(chat_id) logger.info(f"管理员 {update.effective_user.id} 将群组 '{chat_title}' ({chat_id}) 添加到白名单。") await update.message.reply_text(f"✅ 成功将群组 '{chat_title}' (ID: {chat_id}) 加入白名单。\n我现在会在此群组的*所有*未黑名单话题中回复消息。") else: await update.message.reply_text("添加白名单失败,请查看日志。") async def blacklist_topic_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """(V4.3 重构) /blacklisttopic 命令处理器。""" chat_id = update.message.chat.id chat_title = update.message.chat.title thread_id = update.message.message_thread_id # V4.3 修复: 统一 "常规" 话题 thread_id_key = thread_id if thread_id is not None else 0 topic_key = f"{chat_id}:{thread_id_key}" topic_name = "常规话题 (General)" if thread_id_key == 0 else f"话题ID {thread_id_key}" if topic_key in self.blacklisted_topics: # V4.5: 读内存 await update.message.reply_text(f"话题 '{topic_name}' (ID: {topic_key}) 已经在黑名单中了。") return if self.redis.add_blacklisted_topic(topic_key): # 写 Redis async with self.permission_lock: # 写内存 self.blacklisted_topics.add(topic_key) logger.info(f"管理员 {update.effective_user.id} 将话题 '{topic_name}' ({topic_key}) 添加到黑名单。") await update.message.reply_text(f"✅ 成功将*此话题* ({topic_name}, ID: {topic_key}) 加入黑名单。\n我现在会忽略此话题的消息。") else: await update.message.reply_text("添加黑名单失败,请查看日志。") async def set_trigger_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """/settrigger [词] 命令处理器。""" chat_id = update.message.chat.id if not context.args: await update.message.reply_text("请提供一个触发词。用法: /settrigger [词]") return trigger_word = context.args[0].strip() if not trigger_word: await update.message.reply_text("触发词不能为空。") return try: self.redis.set_group_trigger(chat_id, trigger_word) # 写 Redis async with self.permission_lock: # 写内存 self.group_triggers[chat_id] = trigger_word logger.info(f"管理员 {update.effective_user.id} 在群组 {chat_id} 设置了触发词: {trigger_word}") await update.message.reply_text(f"✅ 成功! 本群的回复触发词已设置为: \"{trigger_word}\"") except Exception as e: logger.error(f"设置群组 {chat_id} 触发词时出错: {e}", exc_info=True) await update.message.reply_text("设置触发词失败,请查看日志。") async def save_history_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """(新增) /savehistory (Admin) 手动保存并重置定时器。""" user_id = update.effective_user.id logger.info(f"管理员 {user_id} 手动触发了历史保存。") if not context.job_queue: logger.error(f"用户 {user_id} 尝试保存历史,但 JobQueue 不可用。") await update.message.reply_text("❌ 错误: JobQueue 不可用。") return # 1. 移除旧 if self.save_job: self.save_job.schedule_removal() logger.info("移除了旧的保存定时器。") # 2. 立即运行 await self.save_all_group_history_to_redis(context) # 3. 启动新 self.save_job = context.job_queue.run_repeating( self.save_all_group_history_to_redis, interval=3600, # 1 hour first=3600, # 1 小时后 name="hourly_save" ) logger.info("已启动新的保存定时器。") await update.message.reply_text("✅ 已手动保存所有群组缓存到 Redis,并重置1小时定时器。") # --- 预设 (Preset) 命令实现 --- def _parse_preset_text(self, text: str) -> Optional[Dict[str, Any]]: """ 辅助函数: 解析预设文本。 返回 {"name": str, "messages": List[Dict]} 或 None。 """ name: Optional[str] = None messages: List[Dict[str, str]] = [] matches = self.PRESET_REGEX.finditer(text) for match in matches: key = match.group(1).lower() value = match.group(2).strip() if key == "name": name = value elif key in ("user", "system", "assistant"): # 修复 V4.1: 将 content 包装为新格式 messages.append({"role": key, "content": [{"type": "text", "text": value}]}) if not name or not messages: return None return {"name": name, "messages": messages} async def set_preset_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """(新增) /setpreset 命令处理器 (文本或 .txt 文件)。""" user_id = update.effective_user.id text_to_parse: Optional[str] = None # 1. 检查 .txt 文件 if update.message.document and update.message.document.mime_type == "text/plain": try: doc_file = await update.message.document.get_file() with io.BytesIO() as file_bytes_io: await doc_file.download_to_memory(file_bytes_io) file_bytes_io.seek(0) text_to_parse = file_bytes_io.getvalue().decode('utf-8') logger.info(f"用户 {user_id} 正在通过 .txt 文件设置预设。") except Exception as e: logger.error(f"下载用户 {user_id} 的 .txt 预设文件时出错: {e}") await update.message.reply_text(f"抱歉,我无法读取 .txt 文件: {e}") return # 2. 检查命令文本 (如果不是文件) if not text_to_parse: # 移除 /setpreset 命令本身 if context.args: text_to_parse = update.message.text.split(None, 1)[-1] else: await update.message.reply_text( "用法错误。\n" "请使用 /setpreset [预设内容]...\n" "...或上传一个 .txt 文件并附上 /setpreset 命令。\n\n" "格式:\n" "name: 预设名称\n" "system: 你是一个...\n" "user: 示例...\n" "assistant: 好的..." ) return # 3. 解析 preset_data = self._parse_preset_text(text_to_parse) if not preset_data: await update.message.reply_text( "❌ 解析失败!\n" "请确保您的格式正确,并且至少包含 'name' 和一个 'system/user/assistant' 角色。" ) return name = preset_data["name"] messages = preset_data["messages"] # 4. 保存 if self.redis.set_preset(user_id, name, messages): logger.info(f"用户 {user_id} 成功设置了预设: {name}") await update.message.reply_text( f"✅ 预设已保存!\n" f"名称: **{name}**\n" f"包含 {len(messages)} 条消息。\n\n" f"使用 `/switchpreset {name}` 来激活它。", parse_mode=ParseMode.MARKDOWN ) else: await update.message.reply_text("❌ 预设保存失败,请查看日志。") async def list_presets_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """(新增) /listpresets 命令处理器。""" user_id = update.effective_user.id presets = self.redis.list_presets(user_id) active_preset = self.redis.get_active_preset_name(user_id) if not presets: await update.message.reply_text("您还没有保存任何预设。\n使用 /setpreset 来创建一个。") return message = "以下是您保存的预设:\n\n" for name in presets: if name == active_preset: message += f"▶️ **{name}** (当前激活)\n" else: message += f"• {name}\n" message += "\n使用 `/switchpreset [名称]` 来切换。" await update.message.reply_text(message, parse_mode=ParseMode.MARKDOWN) async def switch_preset_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """(新增) /switchpreset [name] 命令处理器。""" user_id = update.effective_user.id if not context.args: # 如果没有参数,则关闭预设 self.redis.set_active_preset(user_id, "") await update.message.reply_text("✅ 已关闭所有预设。") return preset_name = context.args[0].strip() # 检查预设是否存在 if not self.redis.get_preset(user_id, preset_name): await update.message.reply_text(f"❌ 找不到名为 '{preset_name}' 的预设。") return self.redis.set_active_preset(user_id, preset_name) logger.info(f"用户 {user_id} 切换预设为: {preset_name}") await update.message.reply_text(f"✅ 成功激活预设: **{preset_name}**", parse_mode=ParseMode.MARKDOWN) async def del_preset_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """(新增) /delpreset [name] 命令处理器。""" user_id = update.effective_user.id if not context.args: await update.message.reply_text("请提供要删除的预设名称。用法: /delpreset [名称]") return preset_name = context.args[0].strip() if self.redis.delete_preset(user_id, preset_name): logger.info(f"用户 {user_id} 删除了预设: {preset_name}") await update.message.reply_text(f"✅ 成功删除预设: {preset_name}") else: await update.message.reply_text(f"❌ 找不到名为 '{preset_name}' 的预设,或删除失败。") # --- (V4.5 新增) RSS 命令实现 --- async def sub_rss_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """(V4.5 新增) /subrss 命令处理器。""" chat_id = update.effective_chat.id if self.redis.add_rss_subscriber(chat_id): logger.info(f"RSS: 新订阅者: {chat_id}") await update.message.reply_text("✅ 成功订阅 RSS 更新到此聊天。") else: await update.message.reply_text("ℹ️ 此聊天已经订阅了 RSS。") async def unsub_rss_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """(V4.5 新增) /unsubrss 命令处理器。""" chat_id = update.effective_chat.id if self.redis.remove_rss_subscriber(chat_id): logger.info(f"RSS: 取消订阅: {chat_id}") await update.message.reply_text("✅ 已取消此聊天的 RSS 订阅。") else: await update.message.reply_text("ℹ️ 此聊天未订阅 RSS。") # --- (V4.8 新增) Setu 命令实现 --- async def setu_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """(V4.8) /setu [tag...] 命令处理器。""" user_id = update.effective_user.id chat_id = update.effective_chat.id thread_id = update.message.message_thread_id await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) base_url = "https://api.lolicon.app/setu/v2" params = {} params["size"] = "small" params["r18"] = 2 if context.args: params["tag"] = context.args logger.info(f"用户 {user_id} 在 {chat_id}:{thread_id} 请求带 tags 的 Setu: {context.args}") else: logger.info(f"用户 {user_id} 在 {chat_id}:{thread_id} 请求随机 Setu") try: # 1. 请求 API (V4.9 修复: 移除 async with) response = await self.setu_client.get(base_url, params=params) response.raise_for_status() data = response.json() if data.get("error") or not data.get("data"): logger.warning(f"Setu API 返回错误或空数据: {data.get('error')}") await update.message.reply_text("抱歉,API 返回错误或未找到图片。") return image_data = data["data"][0] image_url = image_data.get("urls", {}).get("small") if not image_url: logger.warning(f"Setu API 响应中缺少 small URL: {image_data}") await update.message.reply_text("抱歉,API 响应格式不正确,缺少图片 URL。") return # 2. 下载图片 (V4.9 修复: 移除 async with) # API 返回的 URL 可能需要替换为 pixiv.re 代理 image_url = image_url.replace("i.pixiv.cat", "i.pixiv.re") logger.debug(f"正在下载 Setu 图片: {image_url}") image_response = await self.setu_client.get(image_url) image_response.raise_for_status() image_bytes = image_response.content # 3. 准备信息 caption = ( f"Title: {image_data.get('title', 'N/A')}\n" f"Author: {image_data.get('author', 'N/A')}\n" f"PID: {image_data.get('pid', 'N/A')}" ) # 4. 发送图片 await update.message.reply_photo( photo=image_bytes, caption=caption, message_thread_id=thread_id ) # (V4.8) 更新活跃度时间戳 if update.message.chat.type in ("group", "supergroup"): context_key = self.redis._get_context_key(user_id, chat_id, thread_id) self.redis.update_last_response_time(context_key) except httpx.HTTPStatusError as e: logger.error(f"Setu API 请求失败 (状态码 {e.response.status_code}): {e.response.text}", exc_info=True) await update.message.reply_text(f"抱歉,获取图片失败 (HTTP 错误): {e.response.status_code}") except Exception as e: logger.error(f"Setu 命令执行失败: {e}", exc_info=True) await update.message.reply_text(f"抱歉,获取图片时遇到未知错误。") # --- (V4.5 新增) 内存缓存辅助方法 --- async def _add_to_group_cache(self, context_key: str, role: str, content: Any): """ (新增) 安全地向内存缓存添加消息并截断。 (V4.1: content 已经是新格式的列表) """ async with self.cache_lock: history = self.group_context_cache.get(context_key, []) history.append({"role": role, "content": content}) # 截断 truncated_history = history[-self.redis.CONTEXT_HISTORY_LIMIT:] self.group_context_cache[context_key] = truncated_history def _is_trigger_on_cooldown(self, chat_id: int, word: str) -> bool: """(V4.5) 检查内存中的触发词冷却""" cooldown_key = f"{chat_id}:{word}" expiry_time = self.trigger_cooldowns.get(cooldown_key) if expiry_time and time.time() < expiry_time: return True # Still on cooldown return False def _set_trigger_cooldown(self, chat_id: int, word: str): """(V4.5) 设置内存中的触发词冷却""" cooldown_key = f"{chat_id}:{word}" self.trigger_cooldowns[cooldown_key] = time.time() + self.TRIGGER_COOLDOWN_SEC async def load_group_history_from_redis(self): """(新增) 启动时从 Redis 加载所有群组上下文到内存。""" logger.info("正在从 Redis 加载群组历史到内存缓存...") count = 0 try: # 使用 scan 替代 keys,避免阻塞 cursor = 0 while True: # V4.2 修复: 确保 scan 使用 self.redis.redis (原始客户端) cursor, keys = self.redis.redis.scan(cursor, match="context:group:*", count=100) if keys: async with self.cache_lock: for key_bytes in keys: key = key_bytes.decode('utf-8') # 解码 raw_data = self.redis.redis.get(key) if raw_data: try: history = json.loads(raw_data) # 截断以防万一 self.group_context_cache[key] = history[-self.redis.CONTEXT_HISTORY_LIMIT:] count += 1 except json.JSONDecodeError: logger.warning(f"无法解析 Redis key {key} 的JSON数据") if cursor == 0: break logger.info(f"成功加载 {count} 个群组上下文到内存。") except Exception as e: logger.error(f"从 Redis 加载群组历史时出错: {e}", exc_info=True) async def save_all_group_history_to_redis(self, context: ContextTypes.DEFAULT_TYPE): """(新增) 将所有内存中的群组缓存保存回 Redis。""" logger.info("定时任务: 正在将群组历史缓存保存到 Redis...") count = 0 cache_copy = {} async with self.cache_lock: # 复制一份以避免在迭代时修改 cache_copy = self.group_context_cache.copy() if not cache_copy: logger.info("定时任务: 内存缓存为空,无需保存。") return try: # (upstash-redis 的 pipeline 不支持循环外定义) # 逐个 set for key, history in cache_copy.items(): self.redis.redis.set( key, json.dumps(history), ex=self.redis.SESSION_EXPIRATION_SEC ) count += 1 logger.info(f"定时任务: 成功将 {count} 个群组上下文保存到 Redis。") except Exception as e: logger.error(f"定时保存群组历史到 Redis 时出错: {e}", exc_info=True) # --- (新增) RSS 检查任务 --- async def check_rss_feed(self, context: ContextTypes.DEFAULT_TYPE): """(V4.5 升级) 每 10 分钟检查一次 RSS 订阅。""" logger.info("RSS: 正在检查 RSS... ") try: # (V4.5) 从 Redis 读取订阅者 subscriber_chat_ids = self.redis.get_rss_subscribers() if not subscriber_chat_ids: logger.info("RSS: 没有订阅者,跳过检查。") return # (V4.9 修复: 移除 async with) response = await self.rss_client.get(self.config.RSS_URL) response.raise_for_status() xmlText = response.text # 使用用户提供的正则表达式 match = re.search(r']', xmlText) if not match: logger.warning("RSS: 无法在 XML 中提取到文章链接。") return latest_link_id = match.group(1) last_known_link = self.redis.get_last_rss_link() if latest_link_id != last_known_link and latest_link_id: logger.info(f"RSS: 发现新文章! ID: {latest_link_id}. 正在推送给 {len(subscriber_chat_ids)} 个聊天。") message = f"主人,您订阅的魔装影姫cien更新啦!\n\nhttps://ci-en.dlsite.com/creator/4551/article/{latest_link_id}" tasks = [] for chat_id in subscriber_chat_ids: tasks.append(context.bot.send_message(chat_id, message)) results = await asyncio.gather(*tasks, return_exceptions=True) # 检查发送结果 for i, result in enumerate(results): if isinstance(result, Exception): chat_id = list(subscriber_chat_ids)[i] logger.error(f"RSS: 推送给 {chat_id} 失败: {result}") # 更新 Redis self.redis.set_last_rss_link(latest_link_id) else: logger.info("RSS: 未发现新文章。") except Exception as e: logger.error(f"RSS 任务失败: {e}", exc_info=True) # --- (V4.7 新增) 活跃度检查任务 --- async def check_group_activity(self, context: ContextTypes.DEFAULT_TYPE): """(V4.7) 每小时检查一次群组活跃度。""" logger.info("定时任务: 正在检查群组活跃度...") now = time.time() # 1. 从 Redis 读取所有时间戳 last_times = self.redis.get_all_last_response_times() # 2. 从内存读取黑名单 (V4.5) blacklisted_topics_mem = self.blacklisted_topics # 3. 获取 OP 的预设 (如果 OP 存在) op_preset_messages = [] if self.config.OP_USER_ID: op_preset_messages = self.redis.get_active_preset_messages(self.config.OP_USER_ID) # 4. 准备硬编码的提示 (V4.1 格式) reengage_prompt = [ {"type": "text", "text": "【现在在这个群聊里已经过去24小时没有人回复消息了,说句话活跃一下气氛吧】"} ] tasks_to_run = [] for context_key, last_time in last_times.items(): try: # 只检查群组 if not context_key.startswith("context:group:"): continue # 检查是否在黑名单中 (从内存读取) if context_key in blacklisted_topics_mem: continue # 检查是否超过 24 小时 (86400 秒) if now - last_time > 86400: logger.info(f"活跃度检查: 话题 {context_key} 已超过 24 小时未响应。") # 解析 chat_id 和 thread_id parts = context_key.split(':') chat_id = int(parts[2]) thread_id = int(parts[3]) # 准备 API 请求 history_for_api = op_preset_messages + [{"role": "user", "content": reengage_prompt}] # (V4.7) 添加一个辅助 coroutine 来处理单个请求 tasks_to_run.append(self.send_reengage_message( context, chat_id, thread_id, history_for_api, context_key )) except Exception as e: logger.error(f"处理活跃度检查 {context_key} 时出错: {e}") if tasks_to_run: logger.info(f"活跃度检查: 发现 {len(tasks_to_run)} 个不活跃的话题,正在发送消息...") await asyncio.gather(*tasks_to_run) else: logger.info("定时任务: 所有群组均在 24 小时内活跃。") async def send_reengage_message( self, context: ContextTypes.DEFAULT_TYPE, chat_id: int, thread_id: int, history_for_api: List[Dict[str, Any]], context_key: str ): """(V4.7) 活跃度检查的辅助函数,用于生成和发送消息。""" try: # 1. 生成 AI 回复 (使用默认模型) bot_response_text = await self.openai.generate_response( self.config.DEFAULT_MODEL, history_for_api ) is_error = False if bot_response_text is None or bot_response_text.startswith("抱歉,"): is_error = True logger.warning(f"活跃度检查: AI 为 {context_key} 生成了错误/空回复。") return # 不发送错误消息 # 2. 准备机器人回复 (V4.4 格式) bot_name = self.bot_name or "Assistant" # (V4.7) 活跃度消息不 @ 任何人 prefixed_response = f"{bot_name}: {bot_response_text}" assistant_api_content = [{"type": "text", "text": prefixed_response}] thread_id_to_send = thread_id if thread_id != 0 else None # 3. 发送消息 try: await context.bot.send_message( chat_id=chat_id, text=bot_response_text, # V4.7: 不带前缀 message_thread_id=thread_id_to_send, parse_mode=ParseMode.MARKDOWN ) except BadRequest as e: if "Can't parse entities" in str(e): logger.warning(f"活跃度检查 (Markdown 失败): {e}. 正在作为纯文本重试。") await context.bot.send_message( chat_id=chat_id, text=bot_response_text, # V4.7: 不带前缀 message_thread_id=thread_id_to_send, parse_mode=None ) else: raise # 抛出其他 BadRequest # 4. 更新内存缓存 await self._add_to_group_cache(context_key, "assistant", assistant_api_content) # 5. 更新 Redis 中的最后响应时间 (!!) self.redis.update_last_response_time(context_key) logger.info(f"活跃度检查: 成功发送消息到 {context_key} 并更新时间戳。") except Exception as e: logger.error(f"发送活跃度消息到 {context_key} 时失败: {e}", exc_info=True) # --- 核心消息处理器 --- async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """ 处理所有符合 ScopeFilter 的文本、图片和 .txt 文件消息。 (V4.6 更新) """ user = update.effective_user if not user: logger.warning("无法获取用户信息 (消息可能来自频道?),忽略。") return user_id = user.id chat_id = update.effective_chat.id chat_type = update.effective_chat.type thread_id = update.message.message_thread_id if update.message else None text_content = update.message.text or update.message.caption # (V4.4 新增) 获取用户名 user_name = user.first_name or user.full_name or "User" # --- 1. 准备用户消息 (多模态) --- user_api_content: List[Dict[str, Any]] = [] # 提取文本 (来自消息或图片标题) if text_content: # (V4.4) 群聊添加前缀 prefixed_text = f"{user_name}: {text_content}" if chat_type != "private" else text_content user_api_content.append({"type": "text", "text": prefixed_text}) # (新增) 提取 .txt 文件 if update.message.document and update.message.document.mime_type == "text/plain": try: doc_file = await update.message.document.get_file() with io.BytesIO() as file_bytes_io: await doc_file.download_to_memory(file_bytes_io) file_bytes_io.seek(0) file_text = file_bytes_io.getvalue().decode('utf-8') # (V4.4) 群聊添加前缀 file_info = f"[上传了 .txt 文件]: {file_text}" prefixed_text = f"{user_name}: {file_info}" if chat_type != "private" else file_info user_api_content.append({"type": "text", "text": prefixed_text}) logger.info(f"已为用户 {user_id} 成功读取 .txt 文件。") except Exception as e: logger.error(f"处理用户 {user_id} 的 .txt 文件时出错: {e}", exc_info=True) await update.message.reply_text("抱歉,我无法读取这个 .txt 文件。") return # 提取图片 (如果有) if update.message.photo: try: photo = update.message.photo[-1] # 获取最高清的图片 photo_file = await photo.get_file() with io.BytesIO() as file_bytes_io: await photo_file.download_to_memory(file_bytes_io) file_bytes_io.seek(0) base64_image = base64.b64encode(file_bytes_io.getvalue()).decode('utf-8') # (V4.1) 图像部分不需要名字前缀 image_url_payload = {"url": f"data:image/jpeg;base64,{base64_image}"} user_api_content.append({"type": "image_url", "image_url": image_url_payload}) logger.info(f"已为用户 {user_id} 成功编码图片。") except Exception as e: logger.error(f"处理用户 {user_id} 的图片时出错: {e}", exc_info=True) await update.message.reply_text("抱歉,我无法处理这张图片。") return # 如果没有提取到任何内容 (例如,只有贴纸),则忽略 if not user_api_content: logger.debug(f"来自用户 {user_id} 的消息没有可处理的内容,忽略。") return # --- 2. (新增) 立即更新群组内存缓存 (用户消息) --- if chat_type in ("group", "supergroup"): context_key = self.redis._get_context_key(user_id, chat_id, thread_id) await self._add_to_group_cache(context_key, "user", user_api_content) # --- 3. 决策: 是否回复? (群组逻辑) --- should_reply = False is_random_reply = False # (V4.5 新增) 标记是否为随机回复 if chat_type == "private": should_reply = True elif chat_type in ("group", "supergroup"): trigger_word = self.group_triggers.get(chat_id) # V4.5: 读内存 # 修复 V4.0: 移除 \b (单词边界),实现包含匹配 if text_content and trigger_word and re.search(re.escape(trigger_word), text_content, re.IGNORECASE): # V4.5: 读/写内存冷却 if not self._is_trigger_on_cooldown(chat_id, trigger_word): should_reply = True is_random_reply = False # (V4.5) 这是触发词回复 logger.info(f"群组 {chat_id} 触发词 '{trigger_word}' 被命中,设置冷却。") self._set_trigger_cooldown(chat_id, trigger_word) else: logger.debug(f"群组 {chat_id} 触发词 '{trigger_word}' 仍在冷却中,忽略。") return # 冷却中,不回复 if not should_reply: if random.random() < self.DEFAULT_GROUP_REPLY_CHANCE: should_reply = True is_random_reply = True # (V4.5) 这是随机回复 logger.info(f"群组 {chat_id} 随机回复 (15%) 命中。") else: logger.debug(f"群组 {chat_id} 随机回复 (15%) 未命中,忽略。") return # 未命中随机,不回复 # --- 决策完毕 --- if not should_reply: return # 最终决定不回复 # 4. 发送 "正在输入..." 状态 try: await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) except Exception: pass # 忽略错误 # 5. 获取会话、模型和历史 model = self.redis.get_current_model(user_id, self.config.DEFAULT_MODEL) preset_messages = self.redis.get_active_preset_messages(user_id) # --- 重构: 获取历史记录 --- history_from_cache = [] if chat_type == "private": # 私聊: 从 Redis 读取 history_from_cache = self.redis.get_conversation_history(user_id, chat_id, thread_id) elif chat_type in ("group", "supergroup"): # 群组: 从内存缓存读取 context_key = self.redis._get_context_key(user_id, chat_id, thread_id) async with self.cache_lock: # 注意: 缓存中已包含当前的用户消息,所以我们不再需要 + user_api_content history_from_cache = self.group_context_cache.get(context_key, []) # 准备 API 请求历史 if chat_type == "private": # 私聊: 预设 + Redis历史 + 当前消息 history_for_api = preset_messages + history_from_cache + [{"role": "user", "content": user_api_content}] else: # 群组: 预设 + 内存历史 (已包含当前消息) history_for_api = preset_messages + history_from_cache # 6. 调用 API bot_response_text = await self.openai.generate_response(model, history_for_api) # --- 7. 错误检查 --- is_error = False if bot_response_text is None: is_error = True bot_response_text = "抱歉,AI 响应为空 (None),请重试。" elif bot_response_text.startswith("抱歉,"): is_error = True # 8. (V4.6 更改) @ 提及 (仅限触发词回复) mention_prefix = "" if chat_type != "private" and not is_random_reply: # 仅在群组的*触发词*回复时 try: # [⁠](tg://user?id=12345) (使用 U+2060 WORD JOINER 作为 "不可见" 链接文本) mention_prefix = f"[\u2060](tg://user?id={user_id}) " logger.info(f"为触发词回复添加 @{user_name} 提及。") except Exception as e: logger.warning(f"为触发词回复添加 @ 提及失败: {e}") # 失败也无妨,继续执行 # (V4.5) 将提及(如果有)添加到回复文本的开头 bot_response_text = mention_prefix + bot_response_text # 9. 回复用户 try: # (新增 V4.2) 尝试 Markdown await update.message.reply_text( bot_response_text, message_thread_id=thread_id, parse_mode=ParseMode.MARKDOWN ) except BadRequest as e: # (V4.3 修复) 如果 Markdown 解析失败 (例如格式错误),则作为普通文本发送 # 停止使用 reply_text 以避免 "Message not found" if "Can't parse entities" in str(e): logger.warning(f"Markdown 解析失败: {e}. 正在作为纯文本重试。") try: await context.bot.send_message( chat_id=chat_id, text=bot_response_text, message_thread_id=thread_id, parse_mode=None # 纯文本 ) except Exception as fallback_e: logger.error(f"纯文本回退发送失败: {fallback_e}", exc_info=True) else: logger.error(f"发送消息时发生意外的 BadRequest: {e}", exc_info=True) except Exception as e: logger.error(f"发送回复时发生未知错误: {e}", exc_info=True) # 10. (重要) 只有在 *没有* 错误时才保存上下文 if not is_error: # (V4.4 新增) 为群聊 AI 回复添加前缀 if chat_type != "private": bot_name = self.bot_name or "Assistant" user_name = user.first_name or "User" # (V4.5 修复) 存储 *不带* @ 提及的原始回复 original_response_text = bot_response_text[len(mention_prefix):] prefixed_response = f"{bot_name}: [回复 @{user_name}]: {original_response_text}" assistant_api_content = [{"type": "text", "text": prefixed_response}] else: assistant_api_content = [{"type": "text", "text": bot_response_text}] if chat_type == "private": # 私聊: 将 "用户" 和 "助手" 都写入 Redis self.redis.add_to_conversation(user_id, chat_id, thread_id, "user", user_api_content) self.redis.add_to_conversation(user_id, chat_id, thread_id, "assistant", assistant_api_content) # (V4.7) 私聊也更新时间戳 (如果用户想设置活跃度定时器) # context_key = self.redis._get_context_key(user_id, chat_id, thread_id) # self.redis.update_last_response_time(context_key) elif chat_type in ("group", "supergroup"): # 群组: 仅将 "助手" 写入内存缓存 (用户消息已在第2步写入) context_key = self.redis._get_context_key(user_id, chat_id, thread_id) await self._add_to_group_cache(context_key, "assistant", assistant_api_content) # (V4.7) 更新群组的最后响应时间 self.redis.update_last_response_time(context_key) else: logger.info(f"检测到 AI 错误,跳过历史记录保存。 (User: {user_id}, Chat: {chat_id})") # --- 6. 启动器 --- def main(): """ 主函数:初始化所有组件并启动机器人。 """ openai_client = None # 在 try 外部定义 app = None # 在 try 外部定义 try: # 1. 初始化 config = Config() redis_manager = RedisManager(config) openai_client = OpenAIClient(config) bot = TelegramBot(config, redis_manager, openai_client) # 2. 注册处理器 bot.setup_handlers() # 3. 获取 application 实例 app = bot.application # 4. 移除手动的 await app.initialize() 和其他异步设置 # 它们现在由 post_init 自动处理。 # 5. 开始轮询 (这是阻塞的,直到机器人停止) logger.info("机器人启动,开始轮⚫询...") app.run_polling(allowed_updates=Update.ALL_TYPES) except ValueError as e: # Config 缺失必要变量时会触发 logger.critical(f"启动失败: {e}") # 退出,因为没有配置无法运行 return except Exception as e: logger.critical(f"机器人主程序遇到致命错误: {e}", exc_info=True) finally: # 优雅关闭 # post_shutdown_cleanup 会自动处理它们 if app: logger.info("Telegram 轮询已停止。") logger.info("机器人已停止。") if __name__ == "__main__": try: main() except KeyboardInterrupt: logger.info("检测到 Ctrl+C,正在关闭...")