import time import logging import json import requests import telegramify_markdown from telebot.apihelper import ApiTelegramException from db_utils import get_db_connection, release_db_connection from telebot import types def call_ai_stream_api(api_url, model, messages, api_key): headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}', } api_url = ( api_url.rstrip("/") + "/chat/completions" if not api_url.endswith("/chat/completions") else api_url ) data = { "messages": messages, "model": model, "stream": True, "temperature": 1.0, } try: with requests.post( api_url, json=data, headers=headers, stream=True ) as response: response.raise_for_status() for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8').replace("data: ", "") if decoded_line.strip() == "[DONE]": break try: json_data = json.loads(decoded_line) if choices := json_data.get("choices"): if ( content := choices[0] .get("delta", {}) .get("content") or choices[0].get("message", {}).get("content") ): yield content except json.JSONDecodeError as e: logging.error(f"JSON 解码失败: {e},数据: {decoded_line}") except requests.exceptions.RequestException as e: logging.error(f"AI API 请求失败: {e}") raise def send_bot_message( bot, chat_id, message, parse_mode='MarkdownV2', disable_web_page_preview=None, reply_to_message_id=None, ): try: if parse_mode == "HTML": sent_message = bot.send_message( chat_id, message, parse_mode="HTML", disable_web_page_preview=disable_web_page_preview, reply_to_message_id=reply_to_message_id, ) elif parse_mode == "MarkdownV2": escaped_message = telegramify_markdown.markdownify(message) sent_message = bot.send_message( chat_id, escaped_message, parse_mode='MarkdownV2', disable_web_page_preview=disable_web_page_preview, reply_to_message_id=reply_to_message_id, ) else: sent_message = bot.send_message( chat_id, message, disable_web_page_preview=disable_web_page_preview, reply_to_message_id=reply_to_message_id, ) return sent_message except ApiTelegramException as e: error_message = str(e) logging.error(f"发送消息到聊天 {chat_id} 失败: {error_message}") bot.send_message(chat_id, f"错误: {error_message}", parse_mode='MarkdownV2') except Exception as e: logging.error(f"发送消息到聊天 {chat_id} 失败: {e}") bot.send_message(chat_id, f"错误: {e}", parse_mode='MarkdownV2') def edit_bot_message(bot, chat_id, message_id, text, parse_mode='MarkdownV2'): try: bot.edit_message_text( chat_id=chat_id, message_id=message_id, text=text, parse_mode=parse_mode ) return True except ApiTelegramException as e: if e.error_code == 429: retry_after = int(e.description.split()[-1]) logging.warning(f"达到速率限制,将在 {retry_after} 秒后重试") time.sleep(retry_after) elif e.error_code == 400 and "message is not modified" in e.description: logging.warning("消息内容未更改,跳过更新") else: logging.error(f"更新消息失败: {e}") return False def process_stream_response( bot, user_id, chat_id, reply_to_message_id, session_key, config ): ( base_delay, max_delay, reset_threshold, delay, retry_count, success_count, ) = (0.5, 5, 5, 0.5, 0, 0) full_response = "" message_id = None last_edit_time = time.time() conn = get_db_connection() if not conn: return try: with conn.cursor() as cursor: session = get_session_data(cursor, session_key) final_messages = ( [{"role": "system", "content": config.get("ai_prompt")}] if config.get("ai_prompt") else [] ) final_messages.extend(session["messages"]) for content_chunk in call_ai_stream_api( config["ai_api"], config["ai_model"], final_messages, config["ai_key"] ): full_response += content_chunk if content_chunk: if message_id is None: message_id = send_initial_message( bot, chat_id, full_response, reply_to_message_id ) if not message_id: return elif time.time() - last_edit_time >= 1: if not edit_bot_message( bot, chat_id, message_id, telegramify_markdown.markdownify(full_response), ): continue last_edit_time = time.time() time.sleep(delay) delay, success_count = base_delay, success_count + 1 if success_count >= reset_threshold: retry_count, success_count = 0, 0 if message_id: edit_bot_message( bot, chat_id, message_id, telegramify_markdown.markdownify(full_response) ) update_session_data( cursor, session_key, session, full_response, message_id, config.get("max_token"), ) conn.commit() logging.info(f"消息 {message_id} 已添加到删除队列,聊天 ID: {chat_id}") except Exception as e: send_bot_message( bot, chat_id, f"*请求错误*\n`{str(e)}`", reply_to_message_id=reply_to_message_id, ) logging.error(f"处理流式响应失败: {e}, 用户ID: {user_id}, 聊天ID: {chat_id}") finally: release_db_connection(conn) def get_session_data(cursor, session_key): cursor.execute( "SELECT messages, message_ids FROM user_sessions WHERE session_key = %s", (session_key,), ) session_row = cursor.fetchone() return ( {"messages": session_row[0], "message_ids": session_row[1]} if session_row else {"messages": [], "message_ids": []} ) def update_session_data(cursor, session_key, session, full_response, message_id, max_token): session["messages"].append({"role": "assistant", "content": full_response}) session["message_ids"].append(message_id) if max_token and len(session["messages"]) > max_token * 2: session["message_ids"], session["messages"] = ( session["message_ids"][2:], session["messages"][2:], ) cursor.execute( """ INSERT INTO user_sessions (session_key, messages, message_ids) VALUES (%s, %s, %s) ON CONFLICT (session_key) DO UPDATE SET messages = EXCLUDED.messages, message_ids = EXCLUDED.message_ids """, ( session_key, json.dumps(session["messages"]), json.dumps(session["message_ids"]), ), ) def send_initial_message(bot, chat_id, full_response, reply_to_message_id): escaped_text = telegramify_markdown.markdownify(full_response) sent_message = send_bot_message( bot, chat_id, escaped_text, reply_to_message_id=reply_to_message_id ) if sent_message: add_message_to_delete_queue(chat_id, sent_message.message_id) logging.info(f"已发送初始消息 {sent_message.message_id} 到聊天 {chat_id},并添加到删除队列") return sent_message.message_id else: send_bot_message( bot, chat_id, "*请求错误*\n`初始消息发送失败`", reply_to_message_id=reply_to_message_id ) logging.error(f"初始消息发送失败,聊天ID: {chat_id}") return None def add_message_to_delete_queue(chat_id, message_id): conn = get_db_connection() if not conn: return try: with conn.cursor() as cursor: cursor.execute( "INSERT INTO bot_messages (chat_id, message_id) VALUES (%s, %s) ON CONFLICT (chat_id, message_id) DO NOTHING", (chat_id, message_id), ) conn.commit() logging.info(f"消息 {message_id} 已添加到删除队列,聊天 ID: {chat_id}") except Exception as e: logging.error(f"将消息 {message_id} 添加到删除队列失败: {e}") finally: release_db_connection(conn) def delete_bot_messages(bot): while True: conn = None sleep_time = 30 try: conn = get_db_connection() if conn: with conn.cursor() as cursor: cursor.execute("SELECT MIN(message_delete_delay) FROM user_configs") min_delay = cursor.fetchone()[0] if min_delay is None: min_delay = 60 sleep_time = 30 else: sleep_time = max(min_delay // 2, 1) cursor.execute( """ SELECT bot_messages.chat_id, bot_messages.message_id, user_configs.message_delete_delay FROM bot_messages JOIN user_configs ON bot_messages.chat_id = user_configs.chat_id WHERE bot_messages.created_at <= current_timestamp - interval '%s seconds' """, (min_delay,), ) messages_to_delete = cursor.fetchall() for chat_id, message_id, delay in messages_to_delete: try: if delay is not None: bot.delete_message(chat_id, message_id) logging.info(f"已删除消息 {message_id},聊天 ID: {chat_id}") cursor.execute( "DELETE FROM bot_messages WHERE chat_id = %s AND message_id = %s", (chat_id, message_id), ) conn.commit() except ApiTelegramException as e: logging.error( f"删除消息 {message_id} 失败,聊天 ID: {chat_id},错误: {e}" ) handle_delete_message_error( cursor, conn, chat_id, message_id, e ) except Exception as e: logging.error( f"处理消息 {message_id} 时发生未知错误,聊天 ID: {chat_id},错误: {e}" ) else: logging.error("获取数据库连接失败") sleep_time = 60 except Exception as e: logging.error(f"删除消息时发生错误: {e}") sleep_time = 60 finally: if conn: release_db_connection(conn) time.sleep(sleep_time) def handle_delete_message_error(cursor, conn, chat_id, message_id, e): if e.error_code == 400 and "message to delete not found" in e.description: logging.warning(f"消息 {message_id} 在聊天 {chat_id} 中未找到,已从数据库中移除") cursor.execute( "DELETE FROM bot_messages WHERE chat_id = %s AND message_id = %s", (chat_id, message_id), ) conn.commit() elif e.error_code == 400 and "message can't be deleted for everyone" in e.description: logging.warning(f"消息 {message_id} 无法删除,可能已过期或没有权限") cursor.execute( "DELETE FROM bot_messages WHERE chat_id = %s AND message_id = %s", (chat_id, message_id), ) conn.commit() else: logging.error(f"删除消息失败: {e}")