aibot / APP /bot_utils.py
Pikilap's picture
refactor(bot_utils, commands, handlers):通过增强函数格式、在 send_bot_message 中添加reply_to_message_id支持以及优化模型分页处理来提高代码的可读性和结构。更新命令中的配置显示并简化消息处理中的错误处理。
49e42ce
import time
import logging
import json
import requests
import telegramify_markdown
from telebot.apihelper import ApiTelegramException
from db_utils import get_db_connection, release_db_connection
from telebot import types
def call_ai_stream_api(api_url, model, messages, api_key):
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}
api_url = (
api_url.rstrip("/") + "/chat/completions"
if not api_url.endswith("/chat/completions")
else api_url
)
data = {
"messages": messages,
"model": model,
"stream": True,
"temperature": 1.0,
}
try:
with requests.post(
api_url, json=data, headers=headers, stream=True
) as response:
response.raise_for_status()
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8').replace("data: ", "")
if decoded_line.strip() == "[DONE]":
break
try:
json_data = json.loads(decoded_line)
if choices := json_data.get("choices"):
if (
content := choices[0]
.get("delta", {})
.get("content")
or choices[0].get("message", {}).get("content")
):
yield content
except json.JSONDecodeError as e:
logging.error(f"JSON 解码失败: {e},数据: {decoded_line}")
except requests.exceptions.RequestException as e:
logging.error(f"AI API 请求失败: {e}")
raise
def send_bot_message(
bot,
chat_id,
message,
parse_mode='MarkdownV2',
disable_web_page_preview=None,
reply_to_message_id=None,
):
try:
if parse_mode == "HTML":
sent_message = bot.send_message(
chat_id,
message,
parse_mode="HTML",
disable_web_page_preview=disable_web_page_preview,
reply_to_message_id=reply_to_message_id,
)
elif parse_mode == "MarkdownV2":
escaped_message = telegramify_markdown.markdownify(message)
sent_message = bot.send_message(
chat_id,
escaped_message,
parse_mode='MarkdownV2',
disable_web_page_preview=disable_web_page_preview,
reply_to_message_id=reply_to_message_id,
)
else:
sent_message = bot.send_message(
chat_id,
message,
disable_web_page_preview=disable_web_page_preview,
reply_to_message_id=reply_to_message_id,
)
return sent_message
except ApiTelegramException as e:
error_message = str(e)
logging.error(f"发送消息到聊天 {chat_id} 失败: {error_message}")
bot.send_message(chat_id, f"错误: {error_message}", parse_mode='MarkdownV2')
except Exception as e:
logging.error(f"发送消息到聊天 {chat_id} 失败: {e}")
bot.send_message(chat_id, f"错误: {e}", parse_mode='MarkdownV2')
def edit_bot_message(bot, chat_id, message_id, text, parse_mode='MarkdownV2'):
try:
bot.edit_message_text(
chat_id=chat_id, message_id=message_id, text=text, parse_mode=parse_mode
)
return True
except ApiTelegramException as e:
if e.error_code == 429:
retry_after = int(e.description.split()[-1])
logging.warning(f"达到速率限制,将在 {retry_after} 秒后重试")
time.sleep(retry_after)
elif e.error_code == 400 and "message is not modified" in e.description:
logging.warning("消息内容未更改,跳过更新")
else:
logging.error(f"更新消息失败: {e}")
return False
def process_stream_response(
bot, user_id, chat_id, reply_to_message_id, session_key, config
):
(
base_delay,
max_delay,
reset_threshold,
delay,
retry_count,
success_count,
) = (0.5, 5, 5, 0.5, 0, 0)
full_response = ""
message_id = None
last_edit_time = time.time()
conn = get_db_connection()
if not conn:
return
try:
with conn.cursor() as cursor:
session = get_session_data(cursor, session_key)
final_messages = (
[{"role": "system", "content": config.get("ai_prompt")}]
if config.get("ai_prompt")
else []
)
final_messages.extend(session["messages"])
for content_chunk in call_ai_stream_api(
config["ai_api"], config["ai_model"], final_messages, config["ai_key"]
):
full_response += content_chunk
if content_chunk:
if message_id is None:
message_id = send_initial_message(
bot, chat_id, full_response, reply_to_message_id
)
if not message_id:
return
elif time.time() - last_edit_time >= 1:
if not edit_bot_message(
bot,
chat_id,
message_id,
telegramify_markdown.markdownify(full_response),
):
continue
last_edit_time = time.time()
time.sleep(delay)
delay, success_count = base_delay, success_count + 1
if success_count >= reset_threshold:
retry_count, success_count = 0, 0
if message_id:
edit_bot_message(
bot, chat_id, message_id, telegramify_markdown.markdownify(full_response)
)
update_session_data(
cursor,
session_key,
session,
full_response,
message_id,
config.get("max_token"),
)
conn.commit()
logging.info(f"消息 {message_id} 已添加到删除队列,聊天 ID: {chat_id}")
except Exception as e:
send_bot_message(
bot,
chat_id,
f"*请求错误*\n`{str(e)}`",
reply_to_message_id=reply_to_message_id,
)
logging.error(f"处理流式响应失败: {e}, 用户ID: {user_id}, 聊天ID: {chat_id}")
finally:
release_db_connection(conn)
def get_session_data(cursor, session_key):
cursor.execute(
"SELECT messages, message_ids FROM user_sessions WHERE session_key = %s",
(session_key,),
)
session_row = cursor.fetchone()
return (
{"messages": session_row[0], "message_ids": session_row[1]}
if session_row
else {"messages": [], "message_ids": []}
)
def update_session_data(cursor, session_key, session, full_response, message_id, max_token):
session["messages"].append({"role": "assistant", "content": full_response})
session["message_ids"].append(message_id)
if max_token and len(session["messages"]) > max_token * 2:
session["message_ids"], session["messages"] = (
session["message_ids"][2:],
session["messages"][2:],
)
cursor.execute(
"""
INSERT INTO user_sessions (session_key, messages, message_ids) VALUES (%s, %s, %s)
ON CONFLICT (session_key) DO UPDATE SET messages = EXCLUDED.messages, message_ids = EXCLUDED.message_ids
""",
(
session_key,
json.dumps(session["messages"]),
json.dumps(session["message_ids"]),
),
)
def send_initial_message(bot, chat_id, full_response, reply_to_message_id):
escaped_text = telegramify_markdown.markdownify(full_response)
sent_message = send_bot_message(
bot, chat_id, escaped_text, reply_to_message_id=reply_to_message_id
)
if sent_message:
add_message_to_delete_queue(chat_id, sent_message.message_id)
logging.info(f"已发送初始消息 {sent_message.message_id} 到聊天 {chat_id},并添加到删除队列")
return sent_message.message_id
else:
send_bot_message(
bot, chat_id, "*请求错误*\n`初始消息发送失败`", reply_to_message_id=reply_to_message_id
)
logging.error(f"初始消息发送失败,聊天ID: {chat_id}")
return None
def add_message_to_delete_queue(chat_id, message_id):
conn = get_db_connection()
if not conn:
return
try:
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO bot_messages (chat_id, message_id) VALUES (%s, %s) ON CONFLICT (chat_id, message_id) DO NOTHING",
(chat_id, message_id),
)
conn.commit()
logging.info(f"消息 {message_id} 已添加到删除队列,聊天 ID: {chat_id}")
except Exception as e:
logging.error(f"将消息 {message_id} 添加到删除队列失败: {e}")
finally:
release_db_connection(conn)
def delete_bot_messages(bot):
while True:
conn = None
sleep_time = 30
try:
conn = get_db_connection()
if conn:
with conn.cursor() as cursor:
cursor.execute("SELECT MIN(message_delete_delay) FROM user_configs")
min_delay = cursor.fetchone()[0]
if min_delay is None:
min_delay = 60
sleep_time = 30
else:
sleep_time = max(min_delay // 2, 1)
cursor.execute(
"""
SELECT bot_messages.chat_id, bot_messages.message_id, user_configs.message_delete_delay
FROM bot_messages
JOIN user_configs ON bot_messages.chat_id = user_configs.chat_id
WHERE bot_messages.created_at <= current_timestamp - interval '%s seconds'
""",
(min_delay,),
)
messages_to_delete = cursor.fetchall()
for chat_id, message_id, delay in messages_to_delete:
try:
if delay is not None:
bot.delete_message(chat_id, message_id)
logging.info(f"已删除消息 {message_id},聊天 ID: {chat_id}")
cursor.execute(
"DELETE FROM bot_messages WHERE chat_id = %s AND message_id = %s",
(chat_id, message_id),
)
conn.commit()
except ApiTelegramException as e:
logging.error(
f"删除消息 {message_id} 失败,聊天 ID: {chat_id},错误: {e}"
)
handle_delete_message_error(
cursor, conn, chat_id, message_id, e
)
except Exception as e:
logging.error(
f"处理消息 {message_id} 时发生未知错误,聊天 ID: {chat_id},错误: {e}"
)
else:
logging.error("获取数据库连接失败")
sleep_time = 60
except Exception as e:
logging.error(f"删除消息时发生错误: {e}")
sleep_time = 60
finally:
if conn:
release_db_connection(conn)
time.sleep(sleep_time)
def handle_delete_message_error(cursor, conn, chat_id, message_id, e):
if e.error_code == 400 and "message to delete not found" in e.description:
logging.warning(f"消息 {message_id} 在聊天 {chat_id} 中未找到,已从数据库中移除")
cursor.execute(
"DELETE FROM bot_messages WHERE chat_id = %s AND message_id = %s",
(chat_id, message_id),
)
conn.commit()
elif e.error_code == 400 and "message can't be deleted for everyone" in e.description:
logging.warning(f"消息 {message_id} 无法删除,可能已过期或没有权限")
cursor.execute(
"DELETE FROM bot_messages WHERE chat_id = %s AND message_id = %s",
(chat_id, message_id),
)
conn.commit()
else:
logging.error(f"删除消息失败: {e}")