test-sk / app.py
letterm's picture
Update app.py
5de638d verified
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
一个功能齐全的、基于OpenAI兼容API的Telegram机器人。
V4.9 更新:
- (修复) 修复 `RuntimeError: Cannot reopen a client instance` (httpx):
- 移除了 `setu_command` 和 `check_rss_feed` 中多余的 `async with` 块。
- `httpx` 客户端现在被正确地在 `__init__` 中初始化,并在 `post_shutdown_cleanup` 中关闭,不再在每次命令调用时被错误地关闭。
- (V4.8) 新增 /setu 命令 (lolicon API)。
- (V4.7) 24小时群组活跃度定时器 (/check_group_activity)。
- (V4.6 修复) 修复 `AttributeError: 'str' object has no attribute 'decode'` (移除 .decode())。
- (V4.6 更改) @ 提及逻辑: (随机回复) 不再 @;(触发词回复) 现在 @ 消息发送者。
- (V4.5) 增加全面的内存缓存层 (权限/触发词/冷却)。
- (V4.5) 动态 RSS 订阅 (/subrss, /unsubrss)。
- (V4.4) 修复 绝对黑名单 (ScopeFilter 应用于基础指令)。
- (V4.4) 重构 群组历史消息格式 (用户名字: 消息 / 机器人名字: [回复 @用户]: 消息)。
- (V4.3) 修复 Markdown 回退错误 (Message to be replied not found)。
- (V4.3) 重构权限为 "群组白名单/话题黑名单" 机制 (/addgroup, /blacklisttopic)。
- (V4.2) 修复 httpx.ConnectError (图片) (verify=False)。
- (V4.1) 修复了触发词匹配逻辑 (包含匹配)。
- (V4.1) 重构了群组上下文管理 (内存缓存)。
- (V4.1) 修复了 OpenAIClient 中 'list index out of range' 的错误。
- (V4.1) 历史记录清洁 (不保存错误)。
- (V4.1) 增加了 AI 预设 (System Prompt) 管理系统。
- (V4.1) 增加了 .txt 文件处理。
- (V4.1) 修复了 API 消息格式 (text, image_url, assistant)。
功能:
- 使用 python-telegram-bot 框架。
- 对接 OpenAI 兼容的 API (例如 Groq, Mistral, 或自定义端点)。
- 使用 Upstash Redis 缓存对话上下文。
- 支持多模态 (图片) 消息,图片以 Base64 格式存入 Redis 并发送到 API。
- 动态模型切换 ( \switchmodel ),启动时自动缓存模型列表。
- 严格的范围控制:响应私聊 和 (已白名单群组的*未*黑名单话题)。
- 基于 Redis 的动态管理员和白/黑名单系统。
- 详细的日志记录和模块化类结构。
所需库:
- python-telegram-bot
- upstash-redis
- httpx (替代 openai 库)
- python-dotenv
"""
import os
import logging
import json
import uuid
import io
import base64
import asyncio
import re # <-- 新增: 用于触发词和预设解析
import random # <-- 新增: 用于随机回复
import time # <-- 新增: 用于内存冷却
from typing import List, Dict, Any, Optional, Set
# 运行: pip install python-telegram-bot upstash-redis httpx python-dotenv
import httpx # 使用 httpx 直接请求 API
from dotenv import load_dotenv
from upstash_redis import Redis
# from openai import OpenAI, AsyncOpenAI # 不再使用 OpenAI SDK
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, BotCommand, Message
from telegram.constants import ChatAction, ParseMode # <-- 新增 ParseMode
from telegram.error import BadRequest # <-- 新增 BadRequest 用于 Markdown 回退
from telegram.ext import (
Application,
ApplicationBuilder,
ContextTypes,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
filters,
Job, # <-- 新增: 用于定时器
JobQueue # <-- 新增: 用于定时器
)
# --- 日志配置 ---
# 配置日志记录
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
# 将 httpx 的日志级别调高,因为它在 DEBUG 级别下过于嘈杂
logging.getLogger("httpx").setLevel(logging.WARNING)
# (新增) 禁用 httpx 的 SSL 警告
import warnings
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
# --- 1. 配置类 ---
class Config:
"""
管理所有从环境变量加载的配置。
"""
def __init__(self):
# 加载 .env 文件 (如果存在)
load_dotenv()
logger.info("正在加载环境变量...")
# Telegram
self.TELEGRAM_BOT_TOKEN: str = self.get_env_required("TELEGRAM_BOT_TOKEN")
# OpenAI 兼容 API
self.OPENAI_API_URL: str = self.get_env_required("OPENAI_COMPATIBLE_URL")
self.OPENAI_API_KEY: str = self.get_env_required("OPENAI_COMPATIBLE_KEY")
self.DEFAULT_MODEL: str = os.getenv("DEFAULT_MODEL", "gpt-3.5-turbo")
# Upstash Redis (upstash-redis 库需要这两个)
self.UPSTASH_REDIS_URL: str = self.get_env_required("UPSTASH_REDIS_REST_URL")
self.UPSTASH_REDIS_TOKEN: str = self.get_env_required("UPSTASH_REDIS_REST_TOKEN")
# 权限
self.ADMIN_USERS: Set[int] = self.parse_int_set_from_env("ADMIN_USERS")
# (新增) RSS
self.RSS_URL: str = os.getenv("RSS_URL", "https://ci-en.dlsite.com/creator/4551/article/xml/rss")
# (V4.5) 移除 RSS_CHAT_IDS, 改为动态订阅
if not self.ADMIN_USERS:
logger.warning("未在环境变量中定义 'ADMIN_USERS'。某些管理功能将受限。")
logger.info("环境变量加载完毕。")
def get_env_required(self, var_name: str) -> str:
"""获取必需的环境变量,如果缺失则抛出异常。"""
value = os.getenv(var_name)
if value is None:
logger.error(f"严重错误: 环境变量 '{var_name}' 未设置。")
raise ValueError(f"环境变量 '{var_name}' 必须被设置。")
return value
def parse_int_set_from_env(self, var_name: str) -> Set[int]:
"""从逗号分隔的环境变量字符串解析为整数集合。"""
value_str = os.getenv(var_name)
if not value_str:
return set()
try:
return {int(x.strip()) for x in value_str.split(',') if x.strip()}
except ValueError:
logger.error(f"无法解析环境变量 '{var_name}'。请确保它是逗号分隔的整数ID。")
return set()
@property
def OP_USER_ID(self) -> Optional[int]:
"""
获取"超级管理员" (OP) 的ID,即 ADMIN_USERS 列表中的第一个。
用于接收启动通知。
"""
if self.ADMIN_USERS:
return next(iter(self.ADMIN_USERS))
return None
# --- 2. Redis 管理类 ---
class RedisManager:
"""
封装所有与 Upstash Redis 的交互。
(V4.5: 主要负责*写入*和*初始加载*,读取操作由内存缓存处理)
"""
# Redis 键名常量
KEY_MODEL_LIST = "bot:model_list"
KEY_ADMIN_USERS = "bot:admin_users"
# V4.3 重构:
KEY_WHITELISTED_GROUPS = "bot:whitelisted_groups" # (Set) 允许的群组 ID
KEY_BLACKLISTED_TOPICS = "bot:blacklisted_topics" # (Set) 禁言的话题 Key (chat_id:thread_id)
KEY_GROUP_TRIGGERS = "bot:group_triggers" # HASH, chat_id -> trigger_word
# 预设键
KEY_PRESETS_PREFIX = "bot:presets:" # HASH, {user_id} -> {preset_name} -> json(messages)
KEY_ACTIVE_PRESET_PREFIX = "bot:active_preset:" # STRING, {user_id} -> preset_name
# RSS 键
KEY_LAST_RSS_LINK = "bot:last_rss_link" # STRING, 存储最新的 article_id
KEY_RSS_SUBSCRIPTIONS = "bot:rss_subscriptions" # (V4.5 新增) Set, 存储订阅的 chat_id
# V4.7 新增: 活跃度
KEY_LAST_RESPONSE_TIMES = "bot:last_response_times" # HASH, context_key -> timestamp
# 会话超时时间 (例如: 24 小时)
SESSION_EXPIRATION_SEC = 86400 # 24 * 60 * 60
# 上下文历史消息限制
CONTEXT_HISTORY_LIMIT = 200
def __init__(self, config: Config):
try:
self.redis = Redis(
url=config.UPSTASH_REDIS_URL,
token=config.UPSTASH_REDIS_TOKEN
)
# 测试连接
self.redis.ping()
logger.info("成功连接到 Upstash Redis。")
except Exception as e:
logger.error(f"无法连接到 Upstash Redis: {e}", exc_info=True)
raise
# --- 键生成辅助方法 ---
def _get_context_key(self, user_id: int, chat_id: int, thread_id: Optional[int]) -> str:
"""
重构: 获取上下文的唯一键。
- 私聊: context:user:{user_id}
- 群组话题: context:group:{chat_id}:{thread_id_key}
"""
if chat_id > 0: # V4.3 修复: 私聊 (chat_id == user_id)
return f"context:user:{user_id}"
# V4.3 重构: 统一处理 "常规" 话题 (None -> 0)
thread_id_key = thread_id if thread_id is not None else 0
return f"context:group:{chat_id}:{thread_id_key}"
def _user_model_key(self, user_id: int) -> str:
"""获取跟踪用户当前模型的键 (保持不变)。"""
return f"user:{user_id}:current_model"
def _user_presets_key(self, user_id: int) -> str:
"""(新增) 获取用户预设 HASH 的键。"""
return f"{self.KEY_PRESETS_PREFIX}{user_id}"
def _user_active_preset_key(self, user_id: int) -> str:
"""(新增) 获取用户当前激活预设的键。"""
return f"{self.KEY_ACTIVE_PRESET_PREFIX}{user_id}"
# --- 启动初始化 ---
def initialize_from_env(self, initial_admins: Set[int]): # V4.2: 移除了 initial_chats
"""
在启动时,将环境变量中的初始管理员同步到 Redis。
(V4.2: 话题白名单现在必须通过命令添加)
"""
try:
if initial_admins:
# SADD 返回成功添加的新成员数量
added_admins = self.redis.sadd(self.KEY_ADMIN_USERS, *initial_admins)
logger.info(f"已将 {added_admins} 个新管理员从环境变量同步到 Redis。")
except Exception as e:
logger.error(f"初始化 Redis 数据时出错: {e}", exc_info=True)
# --- 会话管理 (私聊) ---
def clear_session(self, user_id: int, chat_id: int, thread_id: Optional[int]):
"""重构: 清除当前上下文 (/new)。(私聊或群组均可调用)"""
context_key = self._get_context_key(user_id, chat_id, thread_id)
# 注意: 这只会清除 Redis。群组的内存缓存将在下一次消息时自动重置。
# (或者在 /new 命令中单独清除内存)
self.redis.delete(context_key)
logger.info(f"已为键 {context_key} 清除 Redis 会话。")
def get_conversation_history(self, user_id: int, chat_id: int, thread_id: Optional[int]) -> List[Dict[str, Any]]:
"""
重构: 根据上下文键获取对话历史。
(现在主要用于私聊,或群组的初始加载)
"""
context_key = self._get_context_key(user_id, chat_id, thread_id)
raw_data = self.redis.get(context_key)
if not raw_data:
return []
try:
history = json.loads(raw_data)
if not isinstance(history, list):
logger.warning(f"上下文 {context_key} 的数据格式不正确 (非列表),已重置。")
return []
# 刷新 TTL
self.redis.expire(context_key, self.SESSION_EXPIRATION_SEC)
# 只返回最后 200 条
return history[-self.CONTEXT_HISTORY_LIMIT:]
except json.JSONDecodeError:
logger.error(f"无法解析上下文 {context_key} 的JSON数据,已重置。")
return []
def add_to_conversation(self, user_id: int, chat_id: int, thread_id: Optional[int], role: str, content: Any):
"""
重构: 向指定的上下文添加一条消息。
(现在主要用于私聊)
(V4.1: content 已经是新格式的列表)
"""
context_key = self._get_context_key(user_id, chat_id, thread_id)
try:
raw_data = self.redis.get(context_key)
if not raw_data:
history = []
else:
try:
history = json.loads(raw_data)
if not isinstance(history, list):
history = []
except json.JSONDecodeError:
history = []
# 添加新消息 (content 已经是新格式)
history.append({"role": role, "content": content})
# 写入 (私聊立即写)
self.redis.set(context_key, json.dumps(history), ex=self.SESSION_EXPIRATION_SEC)
logger.debug(f"已将上下文 {context_key}{role} 消息存入 Redis。")
except Exception as e:
logger.error(f"向上下文 {context_key} (私聊) 添加消息时出错: {e}", exc_info=True)
# --- 模型管理 ---
def get_current_model(self, user_id: int, default_model: str) -> str:
"""获取用户的首选模型,如果未设置则返回默认值。"""
model = self.redis.get(self._user_model_key(user_id))
# V4.6 修复: 移除 .decode()
return model if model else default_model
def set_current_model(self, user_id: int, model_name: str):
"""设置用户的首选模型。"""
self.redis.set(self._user_model_key(user_id), model_name)
logger.info(f"用户 {user_id} 将模型切换为: {model_name}")
def cache_model_list(self, models: List[str]):
"""将从API获取的模型列表缓存到 Redis。"""
if not models:
return
# 缓存 1 小时
self.redis.set(self.KEY_MODEL_LIST, json.dumps(models), ex=3600)
logger.info(f"已缓存 {len(models)} 个模型到 Redis。")
def get_cached_model_list(self) -> Optional[List[str]]:
"""从 Redis 获取缓存的模型列表。"""
raw_data = self.redis.get(self.KEY_MODEL_LIST)
if not raw_data:
return None
try:
return json.loads(raw_data)
except json.JSONDecodeError:
return None
# --- 权限管理 (Admin) (V4.5: 读/写) ---
def get_admin_users(self) -> Set[int]:
"""(V4.5) 从 Redis *读取*所有管理员ID (用于启动加载)。"""
try:
# V4.6 修复: 移除 .decode()
return {int(uid) for uid in self.redis.smembers(self.KEY_ADMIN_USERS)}
except Exception as e:
logger.error(f"获取管理员列表时出错: {e}", exc_info=True)
return set()
def add_admin(self, user_id: int) -> bool:
"""(V4.5) *写入*管理员到 Redis。"""
try:
return self.redis.sadd(self.KEY_ADMIN_USERS, user_id) == 1
except Exception as e:
logger.error(f"添加管理员 {user_id} 时出错: {e}")
return False
def remove_admin(self, user_id: int) -> bool:
"""(V4.5) 从 Redis *移除*管理员。"""
try:
return self.redis.srem(self.KEY_ADMIN_USERS, user_id) == 1
except Exception as e:
logger.error(f"移除管理员 {user_id} 时出错: {e}")
return False
# --- 权限管理 (V4.5: Whitelist Group / Blacklist Topic) (读/写) ---
# (V4.5 新增) 群组白名单
def get_whitelisted_groups(self) -> Set[int]:
"""(V4.5) 从 Redis *读取*所有白名单群组 (用于启动加载)。"""
try:
# V4.6 修复: 移除 .decode()
return {int(cid) for cid in self.redis.smembers(self.KEY_WHITELISTED_GROUPS)}
except Exception as e:
logger.error(f"获取白名单群组时出错: {e}", exc_info=True)
return set()
def add_group_whitelist(self, chat_id: int) -> bool:
"""(V4.5) *写入*群组到白名单 Redis。"""
try:
return self.redis.sadd(self.KEY_WHITELISTED_GROUPS, chat_id) == 1
except Exception as e:
logger.error(f"添加白名单群组 {chat_id} 时出错: {e}")
return False
# (V4.5 重构) 话题黑名单
def get_blacklisted_topics(self) -> Set[str]:
"""(V4.5) 从 Redis *读取*所有黑名单话题 (用于启动加载)。"""
try:
# V4.6 修复: 移除 .decode()
return {k for k in self.redis.smembers(self.KEY_BLACKLISTED_TOPICS)}
except Exception as e:
logger.error(f"获取黑名单话题时出错: {e}", exc_info=True)
return set()
def add_blacklisted_topic(self, topic_key: str) -> bool:
"""(V4.5) *写入*话题到黑名单 Redis。"""
try:
return self.redis.sadd(self.KEY_BLACKLISTED_TOPICS, topic_key) == 1
except Exception as e:
logger.error(f"添加黑名单话题 {topic_key} 时出错: {e}")
return False
# --- 群组触发词管理 (V4.5: 读/写) ---
def get_all_group_triggers(self) -> Dict[int, str]:
"""(V4.5) 从 Redis *读取*所有群组触发词 (用于启动加载)。"""
try:
# V4.6 修复: 移除 .decode()
return {int(k): v for k, v in self.redis.hgetall(self.KEY_GROUP_TRIGGERS).items()}
except Exception as e:
logger.error(f"获取所有群组触发词时出错: {e}", exc_info=True)
return {}
def set_group_trigger(self, chat_id: int, word: str):
"""(V4.5) *写入*群组触发词到 Redis。"""
self.redis.hset(self.KEY_GROUP_TRIGGERS, str(chat_id), word)
# --- 预设 (Preset) 管理 ---
def set_preset(self, user_id: int, name: str, messages: List[Dict[str, Any]]) -> bool:
"""
保存一个预设 (HSET)。
(V4.1: messages 已经是新格式)
"""
try:
key = self._user_presets_key(user_id)
self.redis.hset(key, name, json.dumps(messages))
return True
except Exception as e:
logger.error(f"为用户 {user_id} 保存预设 '{name}' 时出错: {e}")
return False
def get_preset(self, user_id: int, name: str) -> Optional[List[Dict[str, Any]]]:
"""获取一个特定的预设 (HGET)。"""
try:
key = self._user_presets_key(user_id)
raw_data = self.redis.hget(key, name)
if not raw_data:
return None
return json.loads(raw_data)
except Exception as e:
logger.error(f"为用户 {user_id} 获取预设 '{name}' 时出错: {e}")
return None
def delete_preset(self, user_id: int, name: str) -> bool:
"""删除一个预设 (HDEL)。"""
try:
key = self._user_presets_key(user_id)
# 检查是否删除了当前激活的预设
active_preset = self.get_active_preset_name(user_id)
if active_preset == name:
self.set_active_preset(user_id, "") # 清空激活的预设
return self.redis.hdel(key, name) > 0
except Exception as e:
logger.error(f"为用户 {user_id} 删除预设 '{name}' 时出错: {e}")
return False
def list_presets(self, user_id: int) -> List[str]:
"""列出用户的所有预设名称 (HKEYS)。"""
try:
key = self._user_presets_key(user_id)
# (V4.6) 修复 bytes decode
return self.redis.hkeys(key)
except Exception as e:
logger.error(f"为用户 {user_id} 列出预设时出错: {e}")
return []
def set_active_preset(self, user_id: int, name: str):
"""设置当前激活的预设 (SET)。"""
key = self._user_active_preset_key(user_id)
if name:
self.redis.set(key, name)
else:
# 如果 name 为空,则删除键
self.redis.delete(key)
def get_active_preset_name(self, user_id: int) -> Optional[str]:
"""获取当前激活的预设名称 (GET)。"""
key = self._user_active_preset_key(user_id)
# (V4.6) 修复 bytes decode
return self.redis.get(key)
def get_active_preset_messages(self, user_id: int) -> List[Dict[str, Any]]:
"""(核心) 获取当前激活的预设的消息列表。"""
name = self.get_active_preset_name(user_id)
if not name:
return []
messages = self.get_preset(user_id, name)
return messages if messages else []
# --- (新增) RSS 管理 ---
def get_last_rss_link(self) -> Optional[str]:
"""获取最后已知的 RSS 链接 ID。"""
# (V4.6) 修复 bytes decode
return self.redis.get(self.KEY_LAST_RSS_LINK)
def set_last_rss_link(self, link_id: str):
"""设置最新的 RSS 链接 ID。"""
self.redis.set(self.KEY_LAST_RSS_LINK, link_id)
def get_rss_subscribers(self) -> Set[int]:
"""(V4.5) 从 Redis *读取*所有 RSS 订阅者。"""
try:
# (V4.6) 修复 bytes decode
return {int(cid) for cid in self.redis.smembers(self.KEY_RSS_SUBSCRIPTIONS)}
except Exception as e:
logger.error(f"获取 RSS 订阅者时出错: {e}")
return set()
def add_rss_subscriber(self, chat_id: int) -> bool:
"""(V4.5) *写入* RSS 订阅者。"""
try:
return self.redis.sadd(self.KEY_RSS_SUBSCRIPTIONS, chat_id) == 1
except Exception as e:
logger.error(f"添加 RSS 订阅者 {chat_id} 时出错: {e}")
return False
def remove_rss_subscriber(self, chat_id: int) -> bool:
"""(V4.5) *移除* RSS 订阅者。"""
try:
return self.redis.srem(self.KEY_RSS_SUBSCRIPTIONS, chat_id) == 1
except Exception as e:
logger.error(f"移除 RSS 订阅者 {chat_id} 时出错: {e}")
return False
# --- (V4.7 新增) 活跃度管理 ---
def update_last_response_time(self, context_key: str):
"""(V4.7) 更新一个话题的最后响应时间戳。"""
try:
self.redis.hset(self.KEY_LAST_RESPONSE_TIMES, context_key, time.time())
except Exception as e:
logger.error(f"更新最后响应时间失败 {context_key}: {e}")
def get_all_last_response_times(self) -> Dict[str, float]:
"""(V4.7) 获取所有话题的最后响应时间戳 (用于启动检查)。"""
try:
# V4.6 修复: 移除 .decode()
return {k: float(v) for k, v in self.redis.hgetall(self.KEY_LAST_RESPONSE_TIMES).items()}
except Exception as e:
logger.error(f"获取所有最后响应时间失败: {e}", exc_info=True)
return {}
# --- 3. OpenAI API 客户端类 (使用 httpx) ---
class OpenAIClient:
"""
封装所有与 OpenAI 兼容 API 的交互。
使用 httpx.AsyncClient 进行异步 API 请求。
"""
def __init__(self, config: Config):
self.base_url = config.OPENAI_API_URL
self.headers = {
"Authorization": f"Bearer {config.OPENAI_API_KEY}",
"Content-Type": "application/json"
}
# 修复 V4.2: 禁用 SSL 验证 (verify=False) 以修复 ConnectError
logger.warning("!!! 安全警告: 正在为 OpenAIClient 禁用 SSL 验证 (verify=False)。")
logger.warning("!!! 这可以修复 ConnectError,但会带来安全风险。请确保 API 端点可信。")
self.client = httpx.AsyncClient(
base_url=self.base_url,
headers=self.headers,
timeout=30.0, # 设置 30 秒超时
follow_redirects=True,
verify=False # <-- 修复: 禁用 SSL 验证
)
self.default_model = config.DEFAULT_MODEL
logger.info(f"OpenAI 客户端 (httpx) 已初始化,指向: {config.OPENAI_API_URL}")
async def get_models(self) -> List[str]:
"""
从 API 获取可用模型列表。
"""
try:
logger.info("正在从 API (httpx) 获取模型列表...")
# 兼容的 API 端点通常是 /v1/models
response = await self.client.get("/v1/models")
response.raise_for_status() # 如果状态码不是 2xx,则抛出异常
models_data = response.json()
# 过滤并返回所有模型ID (接受所有模型)
model_ids = [model['id'] for model in models_data.get('data', []) if model.get('id')]
logger.info(f"成功获取到 {len(model_ids)} 个模型。")
return sorted(model_ids)
except httpx.HTTPStatusError as e:
logger.error(f"API 请求失败 (状态码 {e.response.status_code}): {e.response.text}", exc_info=True)
return []
except Exception as e:
logger.error(f"从 API 获取模型列表失败 (httpx): {e}", exc_info=True)
return []
async def generate_response(self, model: str, history: List[Dict[str, Any]]) -> Optional[str]:
"""
调用聊天补全 (Chat Completions) API。
'history' 必须是符合 API 格式的列表。
修复: 增加了对空 'choices' 列表的检查,防止 'list index out of range'。
"""
logger.debug(f"向模型 {model} (httpx) 发送请求,包含 {len(history)} 条历史消息。")
if history and "image_url" in str(history[-1]):
logger.debug("请求中包含图片。")
# 兼容的 API 端点通常是 /v1/chat/completions
endpoint = "/v1/chat/completions"
payload = {
"model": model,
"messages": history,
"stream": False # 不使用流式响应,简化处理
}
try:
response = await self.client.post(endpoint, json=payload)
response.raise_for_status() # 检查 HTTP 错误
data = response.json()
# 修复: 检查 'choices' 是否存在且不为空
choices = data.get('choices', [])
if not choices or 'message' not in choices[0]:
logger.warning(f"API 响应中未找到 'choices' 或 'message'。响应: {data}")
return "抱歉,AI 响应为空或格式不正确。"
# 修复 V4.1: AI 的回复也在 content 列表中
# (但大多数兼容 API 仍然返回 {role:"...", content:"..."})
# 我们必须检查两种情况
message = choices[0].get('message', {})
response_content = message.get('content')
if response_content is None:
logger.warning(f"API 响应中 'content' 为空。响应: {data}")
return "抱歉,AI 响应格式不正确 (content 为 null)。"
# 检查 content 是字符串 (标准) 还是列表 (新格式)
if isinstance(response_content, str):
response_text = response_content
elif isinstance(response_content, list) and len(response_content) > 0 and response_content[0].get('type') == 'text':
response_text = response_content[0].get('text', '')
else:
logger.warning(f"API 响应的 'content' 格式未知: {response_content}")
return "抱歉,AI 响应格式未知。"
logger.debug(f"模型 {model} 成功返回响应。")
return response_text
except httpx.ConnectError as e: # 捕获特定的 ConnectError
logger.error(f"调用 OpenAI API ({model}) 时(httpx)遇到连接错误 (ConnectError): {e}", exc_info=True)
return f"抱歉,AI 连接失败 (ConnectError): {str(e)}"
except httpx.HTTPStatusError as e:
error_message = f"API 返回错误 (状态码 {e.response.status_code})。详情: {e.response.text}"
logger.error(f"调用 OpenAI API ({model}) 时出错: {error_message}", exc_info=True)
return f"抱歉,调用 AI 模型时出错: {error_message}"
except Exception as e:
logger.error(f"调用 OpenAI API ({model}) 时(httpx)遇到意外错误: {e}", exc_info=True)
# 向用户返回一个更友好的错误信息
return f"抱歉,调用 AI 模型时出错: {str(e)}"
async def close(self):
"""关闭 httpx 客户端。"""
await self.client.aclose()
# --- 4. 自定义过滤器 ---
class AdminFilter(filters.BaseFilter):
"""
(V4.5) 自定义 PTB 过滤器,用于检查用户是否为管理员 (从内存缓存读取)。
"""
def __init__(self, bot: 'TelegramBot'): # V4.5: 传入 bot 实例
self.bot = bot
super().__init__(name="AdminFilter")
def filter(self, message: Message) -> bool:
# V4.5: 从内存缓存读取
return message.from_user.id in self.bot.admin_users
class ScopeFilter(filters.BaseFilter):
"""
(V4.5) 自定义 PTB 过滤器,用于控制机器人的响应范围 (从内存缓存读取)。
允许的条件 (OR):
1. 私聊 (private)
2. (群组在白名单中 AND 话题*未*在黑名单中)
"""
def __init__(self, bot: 'TelegramBot'): # V4.5: 传入 bot 实例
self.bot = bot
super().__init__(name="ScopeFilter")
def filter(self, message: Message) -> bool:
# 1. 允许私聊
if message.chat.type == "private":
return True
# 2. 检查群组
if message.chat.type in ("group", "supergroup"):
# 2.1 检查群组是否在白名单中 (V4.5: 从内存读取)
if not message.chat.id in self.bot.whitelisted_groups:
logger.debug(f"忽略来自 {message.chat.id} 的消息 (群组未在白名单)。")
return False
# 2.2 检查话题是否在黑名单中 (V4.5: 从内存读取)
thread_id = message.message_thread_id
thread_id_key = thread_id if thread_id is not None else 0
topic_key = f"{message.chat.id}:{thread_id_key}"
if topic_key in self.bot.blacklisted_topics:
logger.debug(f"忽略来自 {topic_key} 的消息 (话题在黑名单)。")
return False
# 群组在白名单中,且话题不在黑名单中
return True
# 3. 忽略其他所有情况 (如频道)
return False
# --- 5. Telegram 机器人主类 ---
class TelegramBot:
"""
组织所有 Telegram 机器人逻辑、命令和消息处理器。
"""
# 群组回复配置
DEFAULT_GROUP_REPLY_CHANCE = 0.15 # 15% 随机回复概率
TRIGGER_COOLDOWN_SEC = 30 # 30 秒触发词冷却
# 预设解析正则表达式
PRESET_REGEX = re.compile(
r"^(name|user|system|assistant)[\s::]+(.+)",
re.IGNORECASE | re.MULTILINE
)
def __init__(self, config: Config, redis: RedisManager, openai: OpenAIClient):
self.config = config
self.redis = redis
self.openai = openai
# (V4.5) 内存缓存
self.group_context_cache: Dict[str, List[Dict[str, Any]]] = {}
self.cache_lock = asyncio.Lock() # 用于 group_context_cache
self.permission_lock = asyncio.Lock() # (V4.5) 用于权限/触发词缓存
self.save_job: Optional[Job] = None
self.activity_check_job: Optional[Job] = None # V4.7 新增
# (V4.5) 权限和触发词的内存缓存
self.admin_users: Set[int] = set()
self.whitelisted_groups: Set[int] = set()
self.blacklisted_topics: Set[str] = set()
self.group_triggers: Dict[int, str] = {}
self.trigger_cooldowns: Dict[str, float] = {} # Key: "chat_id:word", Val: expiry_timestamp
# (新增) RSS 客户端
self.rss_client = httpx.AsyncClient(timeout=10.0, verify=False) # 同样禁用 SSL
# (V4.8 新增) Setu 客户端
self.setu_client = httpx.AsyncClient(timeout=20.0, verify=False, follow_redirects=True)
# (新增 V4.4) 存储机器人名字
self.bot_name: Optional[str] = "Bot" # 默认值
self.application = ApplicationBuilder() \
.token(config.TELEGRAM_BOT_TOKEN) \
.post_init(self.post_init_setup) \
.post_shutdown(self.post_shutdown_cleanup) \
.job_queue(JobQueue()) \
.build()
# 实例化自定义过滤器 (V4.5: 传入 self)
self.admin_filter = AdminFilter(self)
self.scope_filter = ScopeFilter(self)
# 存储模型列表,避免频繁查询 Redis
self._model_list_cache: List[str] = []
def setup_handlers(self):
"""注册所有的命令和消息处理器。"""
logger.info("正在注册处理器...")
# --- 核心命令 ---
start_handler = CommandHandler("start", self.start_command, filters=filters.ChatType.PRIVATE)
# (V4.4) 为基础指令添加 scope_filter 以实现绝对黑名单
help_handler = CommandHandler("help", self.help_command, filters=self.scope_filter)
new_handler = CommandHandler("new", self.new_command, filters=self.scope_filter)
switch_model_handler = CommandHandler("switchmodel", self.switch_model_command, filters=self.scope_filter)
# (V4.8 新增) Setu 命令
setu_handler = CommandHandler("setu", self.setu_command, filters=self.scope_filter)
# --- 管理员命令 (使用 admin_filter) ---
add_admin_handler = CommandHandler("addadmin", self.add_admin_command, filters=self.admin_filter)
del_admin_handler = CommandHandler("deladmin", self.del_admin_command, filters=self.admin_filter)
save_history_handler = CommandHandler("savehistory", self.save_history_command, filters=self.admin_filter)
# V4.3 重构:
add_group_handler = CommandHandler(
"addgroup",
self.add_group_whitelist_command,
filters=self.admin_filter & filters.ChatType.GROUPS
)
blacklist_topic_handler = CommandHandler(
"blacklisttopic",
self.blacklist_topic_command,
filters=self.admin_filter & filters.ChatType.GROUPS
)
group_admin_filters = self.admin_filter & filters.ChatType.GROUPS
set_trigger_handler = CommandHandler(
"settrigger",
self.set_trigger_command,
filters=group_admin_filters
)
# --- 预设 (Preset) 命令 ---
set_preset_handler = CommandHandler(
"setpreset",
self.set_preset_command,
# 允许私聊, 允许文本(用于粘贴)和 .txt 文档
filters=filters.ChatType.PRIVATE & (filters.TEXT | filters.Document.TXT)
)
list_presets_handler = CommandHandler("listpresets", self.list_presets_command, filters=filters.ChatType.PRIVATE)
switch_preset_handler = CommandHandler("switchpreset", self.switch_preset_command, filters=filters.ChatType.PRIVATE)
del_preset_handler = CommandHandler("delpreset", self.del_preset_command, filters=filters.ChatType.PRIVATE)
# --- (V4.5 新增) RSS 命令 ---
# (V4.5) scope_filter 确保 /subrss 不能在黑名单话题中运行
sub_rss_handler = CommandHandler("subrss", self.sub_rss_command, filters=self.scope_filter)
unsub_rss_handler = CommandHandler("unsubrss", self.unsub_rss_command, filters=self.scope_filter)
# --- 回调处理器 (用于模型切换) ---
model_callback_handler = CallbackQueryHandler(self.select_model_callback, pattern="^model_select_")
# --- 核心消息处理器 (使用 scope_filter) ---
message_handler = MessageHandler(
(filters.TEXT | filters.PHOTO | filters.Document.TXT) & self.scope_filter & (~filters.COMMAND),
self.handle_message
)
# 注册所有处理器
handlers = [
start_handler, help_handler, new_handler, switch_model_handler,
setu_handler, # V4.8 新增
add_admin_handler, del_admin_handler,
add_group_handler, blacklist_topic_handler, # V4.3 更改
set_trigger_handler, save_history_handler,
set_preset_handler, list_presets_handler, switch_preset_handler, del_preset_handler,
sub_rss_handler, unsub_rss_handler, # V4.5 新增
model_callback_handler,
message_handler
]
self.application.add_handlers(handlers)
logger.info("处理器注册完毕。")
async def post_init_setup(self, application: Application):
"""
在机器人启动时(run_polling被调用后)执行的异步初始化任务。
(V4.7 更新)
"""
logger.info("正在执行机器人启动后数据初始化 (post_init)...")
# (V4.4 新增) 获取机器人名字
try:
bot_user = await application.bot.get_me()
self.bot_name = bot_user.name
logger.info(f"成功获取机器人名字: {self.bot_name}")
except Exception as e:
logger.error(f"无法获取机器人名字: {e}", exc_info=True)
self.bot_name = "Assistant" # 设置回退
# (V4.5) 1. 从 Redis 加载权限和配置到内存
self.admin_users = self.redis.get_admin_users()
self.whitelisted_groups = self.redis.get_whitelisted_groups()
self.blacklisted_topics = self.redis.get_blacklisted_topics()
self.group_triggers = self.redis.get_all_group_triggers()
logger.info(f"成功加载 {len(self.admin_users)} 个管理员, {len(self.whitelisted_groups)} 个白名单群组, {len(self.blacklisted_topics)} 个黑名单话题, {len(self.group_triggers)} 个触发词到内存。")
# 2. 同步环境变量 (Admin) 到 Redis (如果 Redis 为空)
self.redis.initialize_from_env(self.config.ADMIN_USERS)
# 3. 获取和缓存模型
models = await self.openai.get_models()
if models:
self._model_list_cache = models
self.redis.cache_model_list(models)
else:
logger.warning("无法从API获取模型列表,将尝试使用 Redis 缓存 (如果存在)。")
cached_models = self.redis.get_cached_model_list()
if cached_models:
self._model_list_cache = cached_models
logger.info("已从 Redis 缓存加载模型列表。")
else:
logger.error("严重: API 和 Redis 缓存均无模型列表。 \switchmodel 将无法工作。")
# 4. 从 Redis 加载群组历史到内存
await self.load_group_history_from_redis()
# 5. 启动定时任务 (群组保存 + RSS + 活跃度)
if not self.application.job_queue:
logger.error("JobQueue 未初始化! 无法启动定时任务。")
else:
# 群组保存
self.save_job = self.application.job_queue.run_repeating(
self.save_all_group_history_to_redis,
interval=3600, # 1 hour
first=3600, # 1 小时后开始
name="hourly_save"
)
logger.info("已启动每小时群组历史缓存定时器。")
# (新增) RSS 检查
if self.config.RSS_URL: # V4.5: 只要 URL 存在就启动
self.application.job_queue.run_repeating(
self.check_rss_feed,
interval=600, # 10 minutes
first=10, # 10 秒后开始
name="rss_check"
)
logger.info(f"已启动每 10 分钟 RSS 检查任务 (URL: {self.config.RSS_URL})。")
else:
logger.info("未配置 RSS_URL,跳过 RSS 任务。")
# (V4.7 新增) 启动群组活跃度检查
self.activity_check_job = self.application.job_queue.run_repeating(
self.check_group_activity,
interval=3600, # 每小时检查一次
first=60, # 启动 1 分钟后
name="group_activity_check"
)
logger.info("已启动每小时群组活跃度检查定时器。")
# 6. 注册 Telegram 命令
commands = [
BotCommand("help", "查看所有指令和帮助"),
BotCommand("new", "开始一个新的对话会话"),
BotCommand("switchmodel", "切换聊天模型"),
BotCommand("setu", "(V4.8) 随机获取一张图片 (可加 tag)"), # <-- 新增
BotCommand("subrss", "(V4.5) 订阅 RSS 更新到此聊天"),
BotCommand("unsubrss", "(V4.5) 取消此聊天的 RSS 订阅"),
BotCommand("setpreset", "(私聊) 设置AI预设 (通过文本或文件)"),
BotCommand("listpresets", "(私聊) 查看我的AI预设"),
BotCommand("switchpreset", "(私聊) 切换AI预设"),
BotCommand("delpreset", "(私聊) 删除AI预设"),
BotCommand("savehistory", "(管理员) 手动保存群组缓存到Redis"),
BotCommand("addgroup", "(管理员/群组中) 将此群组添加至白名单"), # V4.3 新增
BotCommand("blacklisttopic", "(管理员/话题中) 将此话题添加至黑名单"), # V4.3 新增
BotCommand("settrigger", "(管理员/群组中) 设置本群的AI回复触发词"),
BotCommand("addadmin", "(管理员) 添加管理员 (需回复或提供ID)"),
BotCommand("deladmin", "(管理员) 移除管理员 (需回复或提供ID)"),
]
await application.bot.set_my_commands(commands)
logger.info("Telegram Bot 命令已注册。")
# 7. 发送启动通知
op_id = self.config.OP_USER_ID
if not op_id:
logger.warning("未配置 OP_USER_ID,跳过启动通知。")
return
try:
help_text = self._get_help_text()
startup_msg = f"✅ **机器人 {self.bot_name} 已成功启动!**\n\n{help_text}"
await application.bot.send_message(
chat_id=op_id,
text=startup_msg,
parse_mode="Markdown"
)
logger.info(f"已成功向 OP ({op_id}) 发送启动通知。")
except Exception as e:
logger.error(f"向 OP ({op_id}) 发送启动消息失败: {e}", exc_info=True)
async def post_shutdown_cleanup(self, application: Application):
"""
在机器人关闭时(shutdown被调用后)执行的异步清理任务。
(V4.8 更新)
"""
logger.info("正在执行异步清理 (post_shutdown)...")
# 关机前最后保存一次
if self.group_context_cache:
logger.info("正在执行关机前最后一次群组历史保存...")
# 创建一个临时的 context 对象
fake_context = ContextTypes.DEFAULT_TYPE(application=self.application, chat_id=None, user_id=None)
await self.save_all_group_history_to_redis(fake_context)
logger.info("关机前保存完毕。")
if self.openai:
logger.info("正在关闭 OpenAI (httpx) 客户端...")
await self.openai.close()
logger.info("OpenAI (httpx) 客户端已关闭。")
if self.rss_client:
logger.info("正在关闭 RSS (httpx) 客户端...")
await self.rss_client.aclose()
logger.info("RSS (httpx) 客户端已关闭。")
# (V4.8 新增)
if self.setu_client:
logger.info("正在关闭 Setu (httpx) 客户端...")
await self.setu_client.aclose()
logger.info("Setu (httpx) 客户端已关闭。")
# --- 命令处理器实现 ---
def _get_help_text(self) -> str:
"""生成帮助文本。"""
return """
欢迎使用!我是您的 AI 助手。
**基础指令:**
/help - 显示此帮助信息
/new - 忘记上下文,开始新对话
/switchmodel - 选择要对话的 AI 模型
/setu [tag] [tag...] - (V4.8) 随机获取一张图片 (可加 tag)
/subrss - 订阅 RSS 更新到此聊天
/unsubrss - 取消此聊天的 RSS 订阅
**AI 预设 (System Prompt) [仅限私聊]:**
/setpreset - (回复此消息或上传 .txt) 设置预设。格式:
name: 预设名称
system: 你是一个...
user: 示例输入
assistant: 示例回复
/listpresets - 查看我所有的预设
/switchpreset [名称] - 切换预设
/delpreset [名称] - 删除预设
**管理员指令:**
/addgroup - (在群组中) 将此群组加入白名单
/blacklisttopic - (在*话题中*) 将此话题加入黑名单
/settrigger [词] - (在群组中) 设置一个词,包含它将必定触发回复
/savehistory - (任意位置) 手动保存群组缓存到Redis
/addadmin [user_id] - 添加管理员
/deladmin [user_id] - 移除管理员
**使用方法:**
1. (群组) 管理员需要先使用 /addgroup 将群组加入白名单。
2. (群组) 我会回复所有话题,除非你使用 /blacklisttopic 禁言特定话题 (包括 "常规" 话题)。
3. (群组) 默认我只会随机回复 (15%)。如果设置了 /settrigger,包含触发词的消息我会 @ 您并 100% 回复。
4. (私聊) 您可以直接向我发送消息、图片或 .txt 文件。
"""
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""私聊 \start 命令处理器。"""
await update.message.reply_text(self._get_help_text(), parse_mode=ParseMode.MARKDOWN)
async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""\help 命令处理器。"""
await update.message.reply_text(self._get_help_text(), parse_mode=ParseMode.MARKDOWN)
async def new_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
\new 命令处理器。
重构: 清除当前上下文。
(新增) 同时清除内存缓存。
"""
user_id = update.effective_user.id
chat_id = update.effective_chat.id
chat_type = update.effective_chat.type
thread_id = update.message.message_thread_id if update.message else None
# V4.3 修复: 确保 thread_id 键一致
thread_id_key = thread_id if thread_id is not None else 0
# 1. 清除 Redis
self.redis.clear_session(user_id, chat_id, thread_id)
# 2. (新增) 如果是群组,也清除内存缓存
if chat_type in ("group", "supergroup"):
# V4.3 修复: 使用正确的 context_key
context_key = self.redis._get_context_key(user_id, chat_id, thread_id)
async with self.cache_lock:
if context_key in self.group_context_cache:
del self.group_context_cache[context_key]
logger.info(f"已为键 {context_key} 清除内存缓存。")
logger.info(f"用户 {user_id}{chat_id}:{thread_id_key} 使用了 /new")
await update.message.reply_text("💡 好的,我们来开始一个全新的对话吧!(当前上下文已清除)")
async def switch_model_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""\switchmodel 命令处理器。显示模型选择键盘。"""
models = self._model_list_cache
if not models:
models = self.redis.get_cached_model_list() or []
if not models:
await update.message.reply_text("抱歉,暂时无法获取模型列表。请稍后再试。")
return
current_model = self.redis.get_current_model(
update.effective_user.id,
self.config.DEFAULT_MODEL
)
keyboard: List[List[InlineKeyboardButton]] = []
# 每行最多放 2 个模型
row = []
for i, model_id in enumerate(models):
# 标记当前选中的模型
button_text = f"✅ {model_id}" if model_id == current_model else model_id
# 按钮文本 (button_text) 也可能超长 (64 字节限制)
# 我们需要按字节截断
max_bytes = 60 # 留 4 字节给 "..."
button_text_bytes = button_text.encode('utf-8')
if len(button_text_bytes) > max_bytes:
# 从末尾开始解码,直到找到一个有效的 UTF-8 截断点
while max_bytes > 0:
try:
button_text = button_text_bytes[:max_bytes].decode('utf-8') + "..."
break
except UnicodeDecodeError:
max_bytes -= 1
else:
# 极端情况,无法截断
button_text = "Model..."
row.append(
InlineKeyboardButton(
button_text,
# 使用索引 (i) 作为 callback_data,避免超长
callback_data=f"model_select_{i}"
)
)
if len(row) == 2:
keyboard.append(row)
row = []
if row: # 添加最后一行 (如果不满)
keyboard.append(row)
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"请选择一个模型 (当前: {current_model}):", reply_markup=reply_markup)
# --- 回调处理器 ---
async def select_model_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理模型选择键盘的回调。"""
query = update.callback_query
await query.answer() # 立即响应回调,消除加载状态
user_id = query.from_user.id
model_name = "" # 预定义
try:
model_index_str = query.data.split("model_select_", 1)[1]
model_index = int(model_index_str)
models = self._model_list_cache
if not models:
models = self.redis.get_cached_model_list() or []
if not models:
await query.edit_message_text("错误: 模型列表缓存已过期,请重试 /switchmodel。")
return
if 0 <= model_index < len(models):
model_name = models[model_index]
else:
raise ValueError("模型索引越界")
except (IndexError, TypeError, ValueError) as e:
logger.warning(f"解析模型回调时出错: {e}. Data: {query.data}")
await query.edit_message_text("选择出错,请重试。")
return
if not model_name:
await query.edit_message_text("无法确定所选模型,请重试。")
return
self.redis.set_current_model(user_id, model_name)
logger.info(f"用户 {user_id} 通过回调将模型切换为 {model_name}")
# 更新原始消息,移除键盘
await query.edit_message_text(f"✅ 模型已切换为: {model_name}")
# --- 管理员命令实现 ---
async def _get_id_from_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> Optional[int]:
"""从命令参数或回复的消息中提取 User ID。"""
# 1. 检查回复
if update.message.reply_to_message:
return update.message.reply_to_message.from_user.id
# 2. 检查参数
if context.args:
try:
return int(context.args[0])
except (ValueError, IndexError):
return None
return None
async def add_admin_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""\addadmin 命令处理器。"""
user_id_to_add = await self._get_id_from_command(update, context)
if not user_id_to_add:
await update.message.reply_text("请回复一个用户或提供 User ID。用法: /addadmin [user_id]")
return
if user_id_to_add in self.admin_users: # V4.5: 读内存
await update.message.reply_text(f"用户 {user_id_to_add} 已经是管理员了。")
return
if self.redis.add_admin(user_id_to_add): # 写 Redis
async with self.permission_lock: # 写内存
self.admin_users.add(user_id_to_add)
logger.info(f"管理员 {update.effective_user.id} 添加了新管理员 {user_id_to_add}")
await update.message.reply_text(f"✅ 成功添加管理员: {user_id_to_add}")
else:
await update.message.reply_text("添加失败,请查看日志。")
async def del_admin_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""\deladmin 命令处理器。"""
user_id_to_remove = await self._get_id_from_command(update, context)
if not user_id_to_remove:
await update.message.reply_text("请回复一个用户或提供 User ID。用法: /deladmin [user_id]")
return
# 阻止 OP (第一个管理员) 被移除
if user_id_to_remove == self.config.OP_USER_ID:
await update.message.reply_text("无法移除超级管理员 (OP)。")
return
if user_id_to_remove not in self.admin_users: # V4.5: 读内存
await update.message.reply_text(f"用户 {user_id_to_remove} 不是管理员。")
return
if self.redis.remove_admin(user_id_to_remove): # 写 Redis
async with self.permission_lock: # 写内存
self.admin_users.discard(user_id_to_remove)
logger.info(f"管理员 {update.effective_user.id} 移除了管理员 {user_id_to_remove}")
await update.message.reply_text(f"✅ 成功移除管理员: {user_id_to_remove}")
else:
await update.message.reply_text("移除失败,请查看日志。")
async def add_group_whitelist_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""(V4.3 新增) /addgroup 命令处理器。"""
chat_id = update.message.chat.id
chat_title = update.message.chat.title
if chat_id in self.whitelisted_groups: # V4.5: 读内存
await update.message.reply_text(f"群组 '{chat_title}' (ID: {chat_id}) 已经在白名单中了。")
return
if self.redis.add_group_whitelist(chat_id): # 写 Redis
async with self.permission_lock: # 写内存
self.whitelisted_groups.add(chat_id)
logger.info(f"管理员 {update.effective_user.id} 将群组 '{chat_title}' ({chat_id}) 添加到白名单。")
await update.message.reply_text(f"✅ 成功将群组 '{chat_title}' (ID: {chat_id}) 加入白名单。\n我现在会在此群组的*所有*未黑名单话题中回复消息。")
else:
await update.message.reply_text("添加白名单失败,请查看日志。")
async def blacklist_topic_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""(V4.3 重构) /blacklisttopic 命令处理器。"""
chat_id = update.message.chat.id
chat_title = update.message.chat.title
thread_id = update.message.message_thread_id
# V4.3 修复: 统一 "常规" 话题
thread_id_key = thread_id if thread_id is not None else 0
topic_key = f"{chat_id}:{thread_id_key}"
topic_name = "常规话题 (General)" if thread_id_key == 0 else f"话题ID {thread_id_key}"
if topic_key in self.blacklisted_topics: # V4.5: 读内存
await update.message.reply_text(f"话题 '{topic_name}' (ID: {topic_key}) 已经在黑名单中了。")
return
if self.redis.add_blacklisted_topic(topic_key): # 写 Redis
async with self.permission_lock: # 写内存
self.blacklisted_topics.add(topic_key)
logger.info(f"管理员 {update.effective_user.id} 将话题 '{topic_name}' ({topic_key}) 添加到黑名单。")
await update.message.reply_text(f"✅ 成功将*此话题* ({topic_name}, ID: {topic_key}) 加入黑名单。\n我现在会忽略此话题的消息。")
else:
await update.message.reply_text("添加黑名单失败,请查看日志。")
async def set_trigger_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/settrigger [词] 命令处理器。"""
chat_id = update.message.chat.id
if not context.args:
await update.message.reply_text("请提供一个触发词。用法: /settrigger [词]")
return
trigger_word = context.args[0].strip()
if not trigger_word:
await update.message.reply_text("触发词不能为空。")
return
try:
self.redis.set_group_trigger(chat_id, trigger_word) # 写 Redis
async with self.permission_lock: # 写内存
self.group_triggers[chat_id] = trigger_word
logger.info(f"管理员 {update.effective_user.id} 在群组 {chat_id} 设置了触发词: {trigger_word}")
await update.message.reply_text(f"✅ 成功! 本群的回复触发词已设置为: \"{trigger_word}\"")
except Exception as e:
logger.error(f"设置群组 {chat_id} 触发词时出错: {e}", exc_info=True)
await update.message.reply_text("设置触发词失败,请查看日志。")
async def save_history_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""(新增) /savehistory (Admin) 手动保存并重置定时器。"""
user_id = update.effective_user.id
logger.info(f"管理员 {user_id} 手动触发了历史保存。")
if not context.job_queue:
logger.error(f"用户 {user_id} 尝试保存历史,但 JobQueue 不可用。")
await update.message.reply_text("❌ 错误: JobQueue 不可用。")
return
# 1. 移除旧
if self.save_job:
self.save_job.schedule_removal()
logger.info("移除了旧的保存定时器。")
# 2. 立即运行
await self.save_all_group_history_to_redis(context)
# 3. 启动新
self.save_job = context.job_queue.run_repeating(
self.save_all_group_history_to_redis,
interval=3600, # 1 hour
first=3600, # 1 小时后
name="hourly_save"
)
logger.info("已启动新的保存定时器。")
await update.message.reply_text("✅ 已手动保存所有群组缓存到 Redis,并重置1小时定时器。")
# --- 预设 (Preset) 命令实现 ---
def _parse_preset_text(self, text: str) -> Optional[Dict[str, Any]]:
"""
辅助函数: 解析预设文本。
返回 {"name": str, "messages": List[Dict]} 或 None。
"""
name: Optional[str] = None
messages: List[Dict[str, str]] = []
matches = self.PRESET_REGEX.finditer(text)
for match in matches:
key = match.group(1).lower()
value = match.group(2).strip()
if key == "name":
name = value
elif key in ("user", "system", "assistant"):
# 修复 V4.1: 将 content 包装为新格式
messages.append({"role": key, "content": [{"type": "text", "text": value}]})
if not name or not messages:
return None
return {"name": name, "messages": messages}
async def set_preset_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""(新增) /setpreset 命令处理器 (文本或 .txt 文件)。"""
user_id = update.effective_user.id
text_to_parse: Optional[str] = None
# 1. 检查 .txt 文件
if update.message.document and update.message.document.mime_type == "text/plain":
try:
doc_file = await update.message.document.get_file()
with io.BytesIO() as file_bytes_io:
await doc_file.download_to_memory(file_bytes_io)
file_bytes_io.seek(0)
text_to_parse = file_bytes_io.getvalue().decode('utf-8')
logger.info(f"用户 {user_id} 正在通过 .txt 文件设置预设。")
except Exception as e:
logger.error(f"下载用户 {user_id} 的 .txt 预设文件时出错: {e}")
await update.message.reply_text(f"抱歉,我无法读取 .txt 文件: {e}")
return
# 2. 检查命令文本 (如果不是文件)
if not text_to_parse:
# 移除 /setpreset 命令本身
if context.args:
text_to_parse = update.message.text.split(None, 1)[-1]
else:
await update.message.reply_text(
"用法错误。\n"
"请使用 /setpreset [预设内容]...\n"
"...或上传一个 .txt 文件并附上 /setpreset 命令。\n\n"
"格式:\n"
"name: 预设名称\n"
"system: 你是一个...\n"
"user: 示例...\n"
"assistant: 好的..."
)
return
# 3. 解析
preset_data = self._parse_preset_text(text_to_parse)
if not preset_data:
await update.message.reply_text(
"❌ 解析失败!\n"
"请确保您的格式正确,并且至少包含 'name' 和一个 'system/user/assistant' 角色。"
)
return
name = preset_data["name"]
messages = preset_data["messages"]
# 4. 保存
if self.redis.set_preset(user_id, name, messages):
logger.info(f"用户 {user_id} 成功设置了预设: {name}")
await update.message.reply_text(
f"✅ 预设已保存!\n"
f"名称: **{name}**\n"
f"包含 {len(messages)} 条消息。\n\n"
f"使用 `/switchpreset {name}` 来激活它。",
parse_mode=ParseMode.MARKDOWN
)
else:
await update.message.reply_text("❌ 预设保存失败,请查看日志。")
async def list_presets_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""(新增) /listpresets 命令处理器。"""
user_id = update.effective_user.id
presets = self.redis.list_presets(user_id)
active_preset = self.redis.get_active_preset_name(user_id)
if not presets:
await update.message.reply_text("您还没有保存任何预设。\n使用 /setpreset 来创建一个。")
return
message = "以下是您保存的预设:\n\n"
for name in presets:
if name == active_preset:
message += f"▶️ **{name}** (当前激活)\n"
else:
message += f"• {name}\n"
message += "\n使用 `/switchpreset [名称]` 来切换。"
await update.message.reply_text(message, parse_mode=ParseMode.MARKDOWN)
async def switch_preset_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""(新增) /switchpreset [name] 命令处理器。"""
user_id = update.effective_user.id
if not context.args:
# 如果没有参数,则关闭预设
self.redis.set_active_preset(user_id, "")
await update.message.reply_text("✅ 已关闭所有预设。")
return
preset_name = context.args[0].strip()
# 检查预设是否存在
if not self.redis.get_preset(user_id, preset_name):
await update.message.reply_text(f"❌ 找不到名为 '{preset_name}' 的预设。")
return
self.redis.set_active_preset(user_id, preset_name)
logger.info(f"用户 {user_id} 切换预设为: {preset_name}")
await update.message.reply_text(f"✅ 成功激活预设: **{preset_name}**", parse_mode=ParseMode.MARKDOWN)
async def del_preset_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""(新增) /delpreset [name] 命令处理器。"""
user_id = update.effective_user.id
if not context.args:
await update.message.reply_text("请提供要删除的预设名称。用法: /delpreset [名称]")
return
preset_name = context.args[0].strip()
if self.redis.delete_preset(user_id, preset_name):
logger.info(f"用户 {user_id} 删除了预设: {preset_name}")
await update.message.reply_text(f"✅ 成功删除预设: {preset_name}")
else:
await update.message.reply_text(f"❌ 找不到名为 '{preset_name}' 的预设,或删除失败。")
# --- (V4.5 新增) RSS 命令实现 ---
async def sub_rss_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""(V4.5 新增) /subrss 命令处理器。"""
chat_id = update.effective_chat.id
if self.redis.add_rss_subscriber(chat_id):
logger.info(f"RSS: 新订阅者: {chat_id}")
await update.message.reply_text("✅ 成功订阅 RSS 更新到此聊天。")
else:
await update.message.reply_text("ℹ️ 此聊天已经订阅了 RSS。")
async def unsub_rss_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""(V4.5 新增) /unsubrss 命令处理器。"""
chat_id = update.effective_chat.id
if self.redis.remove_rss_subscriber(chat_id):
logger.info(f"RSS: 取消订阅: {chat_id}")
await update.message.reply_text("✅ 已取消此聊天的 RSS 订阅。")
else:
await update.message.reply_text("ℹ️ 此聊天未订阅 RSS。")
# --- (V4.8 新增) Setu 命令实现 ---
async def setu_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""(V4.8) /setu [tag...] 命令处理器。"""
user_id = update.effective_user.id
chat_id = update.effective_chat.id
thread_id = update.message.message_thread_id
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
base_url = "https://api.lolicon.app/setu/v2"
params = {}
params["size"] = "small"
params["r18"] = 2
if context.args:
params["tag"] = context.args
logger.info(f"用户 {user_id}{chat_id}:{thread_id} 请求带 tags 的 Setu: {context.args}")
else:
logger.info(f"用户 {user_id}{chat_id}:{thread_id} 请求随机 Setu")
try:
# 1. 请求 API (V4.9 修复: 移除 async with)
response = await self.setu_client.get(base_url, params=params)
response.raise_for_status()
data = response.json()
if data.get("error") or not data.get("data"):
logger.warning(f"Setu API 返回错误或空数据: {data.get('error')}")
await update.message.reply_text("抱歉,API 返回错误或未找到图片。")
return
image_data = data["data"][0]
image_url = image_data.get("urls", {}).get("small")
if not image_url:
logger.warning(f"Setu API 响应中缺少 small URL: {image_data}")
await update.message.reply_text("抱歉,API 响应格式不正确,缺少图片 URL。")
return
# 2. 下载图片 (V4.9 修复: 移除 async with)
# API 返回的 URL 可能需要替换为 pixiv.re 代理
image_url = image_url.replace("i.pixiv.cat", "i.pixiv.re")
logger.debug(f"正在下载 Setu 图片: {image_url}")
image_response = await self.setu_client.get(image_url)
image_response.raise_for_status()
image_bytes = image_response.content
# 3. 准备信息
caption = (
f"Title: {image_data.get('title', 'N/A')}\n"
f"Author: {image_data.get('author', 'N/A')}\n"
f"PID: {image_data.get('pid', 'N/A')}"
)
# 4. 发送图片
await update.message.reply_photo(
photo=image_bytes,
caption=caption,
message_thread_id=thread_id
)
# (V4.8) 更新活跃度时间戳
if update.message.chat.type in ("group", "supergroup"):
context_key = self.redis._get_context_key(user_id, chat_id, thread_id)
self.redis.update_last_response_time(context_key)
except httpx.HTTPStatusError as e:
logger.error(f"Setu API 请求失败 (状态码 {e.response.status_code}): {e.response.text}", exc_info=True)
await update.message.reply_text(f"抱歉,获取图片失败 (HTTP 错误): {e.response.status_code}")
except Exception as e:
logger.error(f"Setu 命令执行失败: {e}", exc_info=True)
await update.message.reply_text(f"抱歉,获取图片时遇到未知错误。")
# --- (V4.5 新增) 内存缓存辅助方法 ---
async def _add_to_group_cache(self, context_key: str, role: str, content: Any):
"""
(新增) 安全地向内存缓存添加消息并截断。
(V4.1: content 已经是新格式的列表)
"""
async with self.cache_lock:
history = self.group_context_cache.get(context_key, [])
history.append({"role": role, "content": content})
# 截断
truncated_history = history[-self.redis.CONTEXT_HISTORY_LIMIT:]
self.group_context_cache[context_key] = truncated_history
def _is_trigger_on_cooldown(self, chat_id: int, word: str) -> bool:
"""(V4.5) 检查内存中的触发词冷却"""
cooldown_key = f"{chat_id}:{word}"
expiry_time = self.trigger_cooldowns.get(cooldown_key)
if expiry_time and time.time() < expiry_time:
return True # Still on cooldown
return False
def _set_trigger_cooldown(self, chat_id: int, word: str):
"""(V4.5) 设置内存中的触发词冷却"""
cooldown_key = f"{chat_id}:{word}"
self.trigger_cooldowns[cooldown_key] = time.time() + self.TRIGGER_COOLDOWN_SEC
async def load_group_history_from_redis(self):
"""(新增) 启动时从 Redis 加载所有群组上下文到内存。"""
logger.info("正在从 Redis 加载群组历史到内存缓存...")
count = 0
try:
# 使用 scan 替代 keys,避免阻塞
cursor = 0
while True:
# V4.2 修复: 确保 scan 使用 self.redis.redis (原始客户端)
cursor, keys = self.redis.redis.scan(cursor, match="context:group:*", count=100)
if keys:
async with self.cache_lock:
for key_bytes in keys:
key = key_bytes.decode('utf-8') # 解码
raw_data = self.redis.redis.get(key)
if raw_data:
try:
history = json.loads(raw_data)
# 截断以防万一
self.group_context_cache[key] = history[-self.redis.CONTEXT_HISTORY_LIMIT:]
count += 1
except json.JSONDecodeError:
logger.warning(f"无法解析 Redis key {key} 的JSON数据")
if cursor == 0:
break
logger.info(f"成功加载 {count} 个群组上下文到内存。")
except Exception as e:
logger.error(f"从 Redis 加载群组历史时出错: {e}", exc_info=True)
async def save_all_group_history_to_redis(self, context: ContextTypes.DEFAULT_TYPE):
"""(新增) 将所有内存中的群组缓存保存回 Redis。"""
logger.info("定时任务: 正在将群组历史缓存保存到 Redis...")
count = 0
cache_copy = {}
async with self.cache_lock:
# 复制一份以避免在迭代时修改
cache_copy = self.group_context_cache.copy()
if not cache_copy:
logger.info("定时任务: 内存缓存为空,无需保存。")
return
try:
# (upstash-redis 的 pipeline 不支持循环外定义)
# 逐个 set
for key, history in cache_copy.items():
self.redis.redis.set(
key,
json.dumps(history),
ex=self.redis.SESSION_EXPIRATION_SEC
)
count += 1
logger.info(f"定时任务: 成功将 {count} 个群组上下文保存到 Redis。")
except Exception as e:
logger.error(f"定时保存群组历史到 Redis 时出错: {e}", exc_info=True)
# --- (新增) RSS 检查任务 ---
async def check_rss_feed(self, context: ContextTypes.DEFAULT_TYPE):
"""(V4.5 升级) 每 10 分钟检查一次 RSS 订阅。"""
logger.info("RSS: 正在检查 RSS... ")
try:
# (V4.5) 从 Redis 读取订阅者
subscriber_chat_ids = self.redis.get_rss_subscribers()
if not subscriber_chat_ids:
logger.info("RSS: 没有订阅者,跳过检查。")
return
# (V4.9 修复: 移除 async with)
response = await self.rss_client.get(self.config.RSS_URL)
response.raise_for_status()
xmlText = response.text
# 使用用户提供的正则表达式
match = re.search(r'<rdf:li\s+rdf:resource="[^"]*\/article\/(\d+)"[\s\/>]', xmlText)
if not match:
logger.warning("RSS: 无法在 XML 中提取到文章链接。")
return
latest_link_id = match.group(1)
last_known_link = self.redis.get_last_rss_link()
if latest_link_id != last_known_link and latest_link_id:
logger.info(f"RSS: 发现新文章! ID: {latest_link_id}. 正在推送给 {len(subscriber_chat_ids)} 个聊天。")
message = f"主人,您订阅的魔装影姫cien更新啦!\n\nhttps://ci-en.dlsite.com/creator/4551/article/{latest_link_id}"
tasks = []
for chat_id in subscriber_chat_ids:
tasks.append(context.bot.send_message(chat_id, message))
results = await asyncio.gather(*tasks, return_exceptions=True)
# 检查发送结果
for i, result in enumerate(results):
if isinstance(result, Exception):
chat_id = list(subscriber_chat_ids)[i]
logger.error(f"RSS: 推送给 {chat_id} 失败: {result}")
# 更新 Redis
self.redis.set_last_rss_link(latest_link_id)
else:
logger.info("RSS: 未发现新文章。")
except Exception as e:
logger.error(f"RSS 任务失败: {e}", exc_info=True)
# --- (V4.7 新增) 活跃度检查任务 ---
async def check_group_activity(self, context: ContextTypes.DEFAULT_TYPE):
"""(V4.7) 每小时检查一次群组活跃度。"""
logger.info("定时任务: 正在检查群组活跃度...")
now = time.time()
# 1. 从 Redis 读取所有时间戳
last_times = self.redis.get_all_last_response_times()
# 2. 从内存读取黑名单 (V4.5)
blacklisted_topics_mem = self.blacklisted_topics
# 3. 获取 OP 的预设 (如果 OP 存在)
op_preset_messages = []
if self.config.OP_USER_ID:
op_preset_messages = self.redis.get_active_preset_messages(self.config.OP_USER_ID)
# 4. 准备硬编码的提示 (V4.1 格式)
reengage_prompt = [
{"type": "text", "text": "【现在在这个群聊里已经过去24小时没有人回复消息了,说句话活跃一下气氛吧】"}
]
tasks_to_run = []
for context_key, last_time in last_times.items():
try:
# 只检查群组
if not context_key.startswith("context:group:"):
continue
# 检查是否在黑名单中 (从内存读取)
if context_key in blacklisted_topics_mem:
continue
# 检查是否超过 24 小时 (86400 秒)
if now - last_time > 86400:
logger.info(f"活跃度检查: 话题 {context_key} 已超过 24 小时未响应。")
# 解析 chat_id 和 thread_id
parts = context_key.split(':')
chat_id = int(parts[2])
thread_id = int(parts[3])
# 准备 API 请求
history_for_api = op_preset_messages + [{"role": "user", "content": reengage_prompt}]
# (V4.7) 添加一个辅助 coroutine 来处理单个请求
tasks_to_run.append(self.send_reengage_message(
context, chat_id, thread_id, history_for_api, context_key
))
except Exception as e:
logger.error(f"处理活跃度检查 {context_key} 时出错: {e}")
if tasks_to_run:
logger.info(f"活跃度检查: 发现 {len(tasks_to_run)} 个不活跃的话题,正在发送消息...")
await asyncio.gather(*tasks_to_run)
else:
logger.info("定时任务: 所有群组均在 24 小时内活跃。")
async def send_reengage_message(
self,
context: ContextTypes.DEFAULT_TYPE,
chat_id: int,
thread_id: int,
history_for_api: List[Dict[str, Any]],
context_key: str
):
"""(V4.7) 活跃度检查的辅助函数,用于生成和发送消息。"""
try:
# 1. 生成 AI 回复 (使用默认模型)
bot_response_text = await self.openai.generate_response(
self.config.DEFAULT_MODEL,
history_for_api
)
is_error = False
if bot_response_text is None or bot_response_text.startswith("抱歉,"):
is_error = True
logger.warning(f"活跃度检查: AI 为 {context_key} 生成了错误/空回复。")
return # 不发送错误消息
# 2. 准备机器人回复 (V4.4 格式)
bot_name = self.bot_name or "Assistant"
# (V4.7) 活跃度消息不 @ 任何人
prefixed_response = f"{bot_name}: {bot_response_text}"
assistant_api_content = [{"type": "text", "text": prefixed_response}]
thread_id_to_send = thread_id if thread_id != 0 else None
# 3. 发送消息
try:
await context.bot.send_message(
chat_id=chat_id,
text=bot_response_text, # V4.7: 不带前缀
message_thread_id=thread_id_to_send,
parse_mode=ParseMode.MARKDOWN
)
except BadRequest as e:
if "Can't parse entities" in str(e):
logger.warning(f"活跃度检查 (Markdown 失败): {e}. 正在作为纯文本重试。")
await context.bot.send_message(
chat_id=chat_id,
text=bot_response_text, # V4.7: 不带前缀
message_thread_id=thread_id_to_send,
parse_mode=None
)
else:
raise # 抛出其他 BadRequest
# 4. 更新内存缓存
await self._add_to_group_cache(context_key, "assistant", assistant_api_content)
# 5. 更新 Redis 中的最后响应时间 (!!)
self.redis.update_last_response_time(context_key)
logger.info(f"活跃度检查: 成功发送消息到 {context_key} 并更新时间戳。")
except Exception as e:
logger.error(f"发送活跃度消息到 {context_key} 时失败: {e}", exc_info=True)
# --- 核心消息处理器 ---
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
处理所有符合 ScopeFilter 的文本、图片和 .txt 文件消息。
(V4.6 更新)
"""
user = update.effective_user
if not user:
logger.warning("无法获取用户信息 (消息可能来自频道?),忽略。")
return
user_id = user.id
chat_id = update.effective_chat.id
chat_type = update.effective_chat.type
thread_id = update.message.message_thread_id if update.message else None
text_content = update.message.text or update.message.caption
# (V4.4 新增) 获取用户名
user_name = user.first_name or user.full_name or "User"
# --- 1. 准备用户消息 (多模态) ---
user_api_content: List[Dict[str, Any]] = []
# 提取文本 (来自消息或图片标题)
if text_content:
# (V4.4) 群聊添加前缀
prefixed_text = f"{user_name}: {text_content}" if chat_type != "private" else text_content
user_api_content.append({"type": "text", "text": prefixed_text})
# (新增) 提取 .txt 文件
if update.message.document and update.message.document.mime_type == "text/plain":
try:
doc_file = await update.message.document.get_file()
with io.BytesIO() as file_bytes_io:
await doc_file.download_to_memory(file_bytes_io)
file_bytes_io.seek(0)
file_text = file_bytes_io.getvalue().decode('utf-8')
# (V4.4) 群聊添加前缀
file_info = f"[上传了 .txt 文件]: {file_text}"
prefixed_text = f"{user_name}: {file_info}" if chat_type != "private" else file_info
user_api_content.append({"type": "text", "text": prefixed_text})
logger.info(f"已为用户 {user_id} 成功读取 .txt 文件。")
except Exception as e:
logger.error(f"处理用户 {user_id} 的 .txt 文件时出错: {e}", exc_info=True)
await update.message.reply_text("抱歉,我无法读取这个 .txt 文件。")
return
# 提取图片 (如果有)
if update.message.photo:
try:
photo = update.message.photo[-1] # 获取最高清的图片
photo_file = await photo.get_file()
with io.BytesIO() as file_bytes_io:
await photo_file.download_to_memory(file_bytes_io)
file_bytes_io.seek(0)
base64_image = base64.b64encode(file_bytes_io.getvalue()).decode('utf-8')
# (V4.1) 图像部分不需要名字前缀
image_url_payload = {"url": f"data:image/jpeg;base64,{base64_image}"}
user_api_content.append({"type": "image_url", "image_url": image_url_payload})
logger.info(f"已为用户 {user_id} 成功编码图片。")
except Exception as e:
logger.error(f"处理用户 {user_id} 的图片时出错: {e}", exc_info=True)
await update.message.reply_text("抱歉,我无法处理这张图片。")
return
# 如果没有提取到任何内容 (例如,只有贴纸),则忽略
if not user_api_content:
logger.debug(f"来自用户 {user_id} 的消息没有可处理的内容,忽略。")
return
# --- 2. (新增) 立即更新群组内存缓存 (用户消息) ---
if chat_type in ("group", "supergroup"):
context_key = self.redis._get_context_key(user_id, chat_id, thread_id)
await self._add_to_group_cache(context_key, "user", user_api_content)
# --- 3. 决策: 是否回复? (群组逻辑) ---
should_reply = False
is_random_reply = False # (V4.5 新增) 标记是否为随机回复
if chat_type == "private":
should_reply = True
elif chat_type in ("group", "supergroup"):
trigger_word = self.group_triggers.get(chat_id) # V4.5: 读内存
# 修复 V4.0: 移除 \b (单词边界),实现包含匹配
if text_content and trigger_word and re.search(re.escape(trigger_word), text_content, re.IGNORECASE):
# V4.5: 读/写内存冷却
if not self._is_trigger_on_cooldown(chat_id, trigger_word):
should_reply = True
is_random_reply = False # (V4.5) 这是触发词回复
logger.info(f"群组 {chat_id} 触发词 '{trigger_word}' 被命中,设置冷却。")
self._set_trigger_cooldown(chat_id, trigger_word)
else:
logger.debug(f"群组 {chat_id} 触发词 '{trigger_word}' 仍在冷却中,忽略。")
return # 冷却中,不回复
if not should_reply:
if random.random() < self.DEFAULT_GROUP_REPLY_CHANCE:
should_reply = True
is_random_reply = True # (V4.5) 这是随机回复
logger.info(f"群组 {chat_id} 随机回复 (15%) 命中。")
else:
logger.debug(f"群组 {chat_id} 随机回复 (15%) 未命中,忽略。")
return # 未命中随机,不回复
# --- 决策完毕 ---
if not should_reply:
return # 最终决定不回复
# 4. 发送 "正在输入..." 状态
try:
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
except Exception:
pass # 忽略错误
# 5. 获取会话、模型和历史
model = self.redis.get_current_model(user_id, self.config.DEFAULT_MODEL)
preset_messages = self.redis.get_active_preset_messages(user_id)
# --- 重构: 获取历史记录 ---
history_from_cache = []
if chat_type == "private":
# 私聊: 从 Redis 读取
history_from_cache = self.redis.get_conversation_history(user_id, chat_id, thread_id)
elif chat_type in ("group", "supergroup"):
# 群组: 从内存缓存读取
context_key = self.redis._get_context_key(user_id, chat_id, thread_id)
async with self.cache_lock:
# 注意: 缓存中已包含当前的用户消息,所以我们不再需要 + user_api_content
history_from_cache = self.group_context_cache.get(context_key, [])
# 准备 API 请求历史
if chat_type == "private":
# 私聊: 预设 + Redis历史 + 当前消息
history_for_api = preset_messages + history_from_cache + [{"role": "user", "content": user_api_content}]
else:
# 群组: 预设 + 内存历史 (已包含当前消息)
history_for_api = preset_messages + history_from_cache
# 6. 调用 API
bot_response_text = await self.openai.generate_response(model, history_for_api)
# --- 7. 错误检查 ---
is_error = False
if bot_response_text is None:
is_error = True
bot_response_text = "抱歉,AI 响应为空 (None),请重试。"
elif bot_response_text.startswith("抱歉,"):
is_error = True
# 8. (V4.6 更改) @ 提及 (仅限触发词回复)
mention_prefix = ""
if chat_type != "private" and not is_random_reply: # 仅在群组的*触发词*回复时
try:
# [⁠](tg://user?id=12345) (使用 U+2060 WORD JOINER 作为 "不可见" 链接文本)
mention_prefix = f"[\u2060](tg://user?id={user_id}) "
logger.info(f"为触发词回复添加 @{user_name} 提及。")
except Exception as e:
logger.warning(f"为触发词回复添加 @ 提及失败: {e}")
# 失败也无妨,继续执行
# (V4.5) 将提及(如果有)添加到回复文本的开头
bot_response_text = mention_prefix + bot_response_text
# 9. 回复用户
try:
# (新增 V4.2) 尝试 Markdown
await update.message.reply_text(
bot_response_text,
message_thread_id=thread_id,
parse_mode=ParseMode.MARKDOWN
)
except BadRequest as e:
# (V4.3 修复) 如果 Markdown 解析失败 (例如格式错误),则作为普通文本发送
# 停止使用 reply_text 以避免 "Message not found"
if "Can't parse entities" in str(e):
logger.warning(f"Markdown 解析失败: {e}. 正在作为纯文本重试。")
try:
await context.bot.send_message(
chat_id=chat_id,
text=bot_response_text,
message_thread_id=thread_id,
parse_mode=None # 纯文本
)
except Exception as fallback_e:
logger.error(f"纯文本回退发送失败: {fallback_e}", exc_info=True)
else:
logger.error(f"发送消息时发生意外的 BadRequest: {e}", exc_info=True)
except Exception as e:
logger.error(f"发送回复时发生未知错误: {e}", exc_info=True)
# 10. (重要) 只有在 *没有* 错误时才保存上下文
if not is_error:
# (V4.4 新增) 为群聊 AI 回复添加前缀
if chat_type != "private":
bot_name = self.bot_name or "Assistant"
user_name = user.first_name or "User"
# (V4.5 修复) 存储 *不带* @ 提及的原始回复
original_response_text = bot_response_text[len(mention_prefix):]
prefixed_response = f"{bot_name}: [回复 @{user_name}]: {original_response_text}"
assistant_api_content = [{"type": "text", "text": prefixed_response}]
else:
assistant_api_content = [{"type": "text", "text": bot_response_text}]
if chat_type == "private":
# 私聊: 将 "用户" 和 "助手" 都写入 Redis
self.redis.add_to_conversation(user_id, chat_id, thread_id, "user", user_api_content)
self.redis.add_to_conversation(user_id, chat_id, thread_id, "assistant", assistant_api_content)
# (V4.7) 私聊也更新时间戳 (如果用户想设置活跃度定时器)
# context_key = self.redis._get_context_key(user_id, chat_id, thread_id)
# self.redis.update_last_response_time(context_key)
elif chat_type in ("group", "supergroup"):
# 群组: 仅将 "助手" 写入内存缓存 (用户消息已在第2步写入)
context_key = self.redis._get_context_key(user_id, chat_id, thread_id)
await self._add_to_group_cache(context_key, "assistant", assistant_api_content)
# (V4.7) 更新群组的最后响应时间
self.redis.update_last_response_time(context_key)
else:
logger.info(f"检测到 AI 错误,跳过历史记录保存。 (User: {user_id}, Chat: {chat_id})")
# --- 6. 启动器 ---
def main():
"""
主函数:初始化所有组件并启动机器人。
"""
openai_client = None # 在 try 外部定义
app = None # 在 try 外部定义
try:
# 1. 初始化
config = Config()
redis_manager = RedisManager(config)
openai_client = OpenAIClient(config)
bot = TelegramBot(config, redis_manager, openai_client)
# 2. 注册处理器
bot.setup_handlers()
# 3. 获取 application 实例
app = bot.application
# 4. 移除手动的 await app.initialize() 和其他异步设置
# 它们现在由 post_init 自动处理。
# 5. 开始轮询 (这是阻塞的,直到机器人停止)
logger.info("机器人启动,开始轮⚫询...")
app.run_polling(allowed_updates=Update.ALL_TYPES)
except ValueError as e:
# Config 缺失必要变量时会触发
logger.critical(f"启动失败: {e}")
# 退出,因为没有配置无法运行
return
except Exception as e:
logger.critical(f"机器人主程序遇到致命错误: {e}", exc_info=True)
finally:
# 优雅关闭
# post_shutdown_cleanup 会自动处理它们
if app:
logger.info("Telegram 轮询已停止。")
logger.info("机器人已停止。")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logger.info("检测到 Ctrl+C,正在关闭...")