| import time | |
| import logging | |
| import json | |
| import requests | |
| import telegramify_markdown | |
| from telebot.apihelper import ApiTelegramException | |
| from db_utils import get_db_connection, release_db_connection | |
| from telebot import types | |
| def call_ai_stream_api(api_url, model, messages, api_key): | |
| headers = { | |
| 'Content-Type': 'application/json', | |
| 'Authorization': f'Bearer {api_key}', | |
| } | |
| api_url = ( | |
| api_url.rstrip("/") + "/chat/completions" | |
| if not api_url.endswith("/chat/completions") | |
| else api_url | |
| ) | |
| data = { | |
| "messages": messages, | |
| "model": model, | |
| "stream": True, | |
| "temperature": 1.0, | |
| } | |
| try: | |
| with requests.post( | |
| api_url, json=data, headers=headers, stream=True | |
| ) as response: | |
| response.raise_for_status() | |
| for line in response.iter_lines(): | |
| if line: | |
| decoded_line = line.decode('utf-8').replace("data: ", "") | |
| if decoded_line.strip() == "[DONE]": | |
| break | |
| try: | |
| json_data = json.loads(decoded_line) | |
| if choices := json_data.get("choices"): | |
| if ( | |
| content := choices[0] | |
| .get("delta", {}) | |
| .get("content") | |
| or choices[0].get("message", {}).get("content") | |
| ): | |
| yield content | |
| except json.JSONDecodeError as e: | |
| logging.error(f"JSON 解码失败: {e},数据: {decoded_line}") | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"AI API 请求失败: {e}") | |
| raise | |
| def send_bot_message( | |
| bot, | |
| chat_id, | |
| message, | |
| parse_mode='MarkdownV2', | |
| disable_web_page_preview=None, | |
| reply_to_message_id=None, | |
| ): | |
| try: | |
| if parse_mode == "HTML": | |
| sent_message = bot.send_message( | |
| chat_id, | |
| message, | |
| parse_mode="HTML", | |
| disable_web_page_preview=disable_web_page_preview, | |
| reply_to_message_id=reply_to_message_id, | |
| ) | |
| elif parse_mode == "MarkdownV2": | |
| escaped_message = telegramify_markdown.markdownify(message) | |
| sent_message = bot.send_message( | |
| chat_id, | |
| escaped_message, | |
| parse_mode='MarkdownV2', | |
| disable_web_page_preview=disable_web_page_preview, | |
| reply_to_message_id=reply_to_message_id, | |
| ) | |
| else: | |
| sent_message = bot.send_message( | |
| chat_id, | |
| message, | |
| disable_web_page_preview=disable_web_page_preview, | |
| reply_to_message_id=reply_to_message_id, | |
| ) | |
| return sent_message | |
| except ApiTelegramException as e: | |
| error_message = str(e) | |
| logging.error(f"发送消息到聊天 {chat_id} 失败: {error_message}") | |
| bot.send_message(chat_id, f"错误: {error_message}", parse_mode='MarkdownV2') | |
| except Exception as e: | |
| logging.error(f"发送消息到聊天 {chat_id} 失败: {e}") | |
| bot.send_message(chat_id, f"错误: {e}", parse_mode='MarkdownV2') | |
| def edit_bot_message(bot, chat_id, message_id, text, parse_mode='MarkdownV2'): | |
| try: | |
| bot.edit_message_text( | |
| chat_id=chat_id, message_id=message_id, text=text, parse_mode=parse_mode | |
| ) | |
| return True | |
| except ApiTelegramException as e: | |
| if e.error_code == 429: | |
| retry_after = int(e.description.split()[-1]) | |
| logging.warning(f"达到速率限制,将在 {retry_after} 秒后重试") | |
| time.sleep(retry_after) | |
| elif e.error_code == 400 and "message is not modified" in e.description: | |
| logging.warning("消息内容未更改,跳过更新") | |
| else: | |
| logging.error(f"更新消息失败: {e}") | |
| return False | |
| def process_stream_response( | |
| bot, user_id, chat_id, reply_to_message_id, session_key, config | |
| ): | |
| ( | |
| base_delay, | |
| max_delay, | |
| reset_threshold, | |
| delay, | |
| retry_count, | |
| success_count, | |
| ) = (0.5, 5, 5, 0.5, 0, 0) | |
| full_response = "" | |
| message_id = None | |
| last_edit_time = time.time() | |
| conn = get_db_connection() | |
| if not conn: | |
| return | |
| try: | |
| with conn.cursor() as cursor: | |
| session = get_session_data(cursor, session_key) | |
| final_messages = ( | |
| [{"role": "system", "content": config.get("ai_prompt")}] | |
| if config.get("ai_prompt") | |
| else [] | |
| ) | |
| final_messages.extend(session["messages"]) | |
| for content_chunk in call_ai_stream_api( | |
| config["ai_api"], config["ai_model"], final_messages, config["ai_key"] | |
| ): | |
| full_response += content_chunk | |
| if content_chunk: | |
| if message_id is None: | |
| message_id = send_initial_message( | |
| bot, chat_id, full_response, reply_to_message_id | |
| ) | |
| if not message_id: | |
| return | |
| elif time.time() - last_edit_time >= 1: | |
| if not edit_bot_message( | |
| bot, | |
| chat_id, | |
| message_id, | |
| telegramify_markdown.markdownify(full_response), | |
| ): | |
| continue | |
| last_edit_time = time.time() | |
| time.sleep(delay) | |
| delay, success_count = base_delay, success_count + 1 | |
| if success_count >= reset_threshold: | |
| retry_count, success_count = 0, 0 | |
| if message_id: | |
| edit_bot_message( | |
| bot, chat_id, message_id, telegramify_markdown.markdownify(full_response) | |
| ) | |
| update_session_data( | |
| cursor, | |
| session_key, | |
| session, | |
| full_response, | |
| message_id, | |
| config.get("max_token"), | |
| ) | |
| conn.commit() | |
| logging.info(f"消息 {message_id} 已添加到删除队列,聊天 ID: {chat_id}") | |
| except Exception as e: | |
| send_bot_message( | |
| bot, | |
| chat_id, | |
| f"*请求错误*\n`{str(e)}`", | |
| reply_to_message_id=reply_to_message_id, | |
| ) | |
| logging.error(f"处理流式响应失败: {e}, 用户ID: {user_id}, 聊天ID: {chat_id}") | |
| finally: | |
| release_db_connection(conn) | |
| def get_session_data(cursor, session_key): | |
| cursor.execute( | |
| "SELECT messages, message_ids FROM user_sessions WHERE session_key = %s", | |
| (session_key,), | |
| ) | |
| session_row = cursor.fetchone() | |
| return ( | |
| {"messages": session_row[0], "message_ids": session_row[1]} | |
| if session_row | |
| else {"messages": [], "message_ids": []} | |
| ) | |
| def update_session_data(cursor, session_key, session, full_response, message_id, max_token): | |
| session["messages"].append({"role": "assistant", "content": full_response}) | |
| session["message_ids"].append(message_id) | |
| if max_token and len(session["messages"]) > max_token * 2: | |
| session["message_ids"], session["messages"] = ( | |
| session["message_ids"][2:], | |
| session["messages"][2:], | |
| ) | |
| cursor.execute( | |
| """ | |
| INSERT INTO user_sessions (session_key, messages, message_ids) VALUES (%s, %s, %s) | |
| ON CONFLICT (session_key) DO UPDATE SET messages = EXCLUDED.messages, message_ids = EXCLUDED.message_ids | |
| """, | |
| ( | |
| session_key, | |
| json.dumps(session["messages"]), | |
| json.dumps(session["message_ids"]), | |
| ), | |
| ) | |
| def send_initial_message(bot, chat_id, full_response, reply_to_message_id): | |
| escaped_text = telegramify_markdown.markdownify(full_response) | |
| sent_message = send_bot_message( | |
| bot, chat_id, escaped_text, reply_to_message_id=reply_to_message_id | |
| ) | |
| if sent_message: | |
| add_message_to_delete_queue(chat_id, sent_message.message_id) | |
| logging.info(f"已发送初始消息 {sent_message.message_id} 到聊天 {chat_id},并添加到删除队列") | |
| return sent_message.message_id | |
| else: | |
| send_bot_message( | |
| bot, chat_id, "*请求错误*\n`初始消息发送失败`", reply_to_message_id=reply_to_message_id | |
| ) | |
| logging.error(f"初始消息发送失败,聊天ID: {chat_id}") | |
| return None | |
| def add_message_to_delete_queue(chat_id, message_id): | |
| conn = get_db_connection() | |
| if not conn: | |
| return | |
| try: | |
| with conn.cursor() as cursor: | |
| cursor.execute( | |
| "INSERT INTO bot_messages (chat_id, message_id) VALUES (%s, %s) ON CONFLICT (chat_id, message_id) DO NOTHING", | |
| (chat_id, message_id), | |
| ) | |
| conn.commit() | |
| logging.info(f"消息 {message_id} 已添加到删除队列,聊天 ID: {chat_id}") | |
| except Exception as e: | |
| logging.error(f"将消息 {message_id} 添加到删除队列失败: {e}") | |
| finally: | |
| release_db_connection(conn) | |
| def delete_bot_messages(bot): | |
| while True: | |
| conn = None | |
| sleep_time = 30 | |
| try: | |
| conn = get_db_connection() | |
| if conn: | |
| with conn.cursor() as cursor: | |
| cursor.execute("SELECT MIN(message_delete_delay) FROM user_configs") | |
| min_delay = cursor.fetchone()[0] | |
| if min_delay is None: | |
| min_delay = 60 | |
| sleep_time = 30 | |
| else: | |
| sleep_time = max(min_delay // 2, 1) | |
| cursor.execute( | |
| """ | |
| SELECT bot_messages.chat_id, bot_messages.message_id, user_configs.message_delete_delay | |
| FROM bot_messages | |
| JOIN user_configs ON bot_messages.chat_id = user_configs.chat_id | |
| WHERE bot_messages.created_at <= current_timestamp - interval '%s seconds' | |
| """, | |
| (min_delay,), | |
| ) | |
| messages_to_delete = cursor.fetchall() | |
| for chat_id, message_id, delay in messages_to_delete: | |
| try: | |
| if delay is not None: | |
| bot.delete_message(chat_id, message_id) | |
| logging.info(f"已删除消息 {message_id},聊天 ID: {chat_id}") | |
| cursor.execute( | |
| "DELETE FROM bot_messages WHERE chat_id = %s AND message_id = %s", | |
| (chat_id, message_id), | |
| ) | |
| conn.commit() | |
| except ApiTelegramException as e: | |
| logging.error( | |
| f"删除消息 {message_id} 失败,聊天 ID: {chat_id},错误: {e}" | |
| ) | |
| handle_delete_message_error( | |
| cursor, conn, chat_id, message_id, e | |
| ) | |
| except Exception as e: | |
| logging.error( | |
| f"处理消息 {message_id} 时发生未知错误,聊天 ID: {chat_id},错误: {e}" | |
| ) | |
| else: | |
| logging.error("获取数据库连接失败") | |
| sleep_time = 60 | |
| except Exception as e: | |
| logging.error(f"删除消息时发生错误: {e}") | |
| sleep_time = 60 | |
| finally: | |
| if conn: | |
| release_db_connection(conn) | |
| time.sleep(sleep_time) | |
| def handle_delete_message_error(cursor, conn, chat_id, message_id, e): | |
| if e.error_code == 400 and "message to delete not found" in e.description: | |
| logging.warning(f"消息 {message_id} 在聊天 {chat_id} 中未找到,已从数据库中移除") | |
| cursor.execute( | |
| "DELETE FROM bot_messages WHERE chat_id = %s AND message_id = %s", | |
| (chat_id, message_id), | |
| ) | |
| conn.commit() | |
| elif e.error_code == 400 and "message can't be deleted for everyone" in e.description: | |
| logging.warning(f"消息 {message_id} 无法删除,可能已过期或没有权限") | |
| cursor.execute( | |
| "DELETE FROM bot_messages WHERE chat_id = %s AND message_id = %s", | |
| (chat_id, message_id), | |
| ) | |
| conn.commit() | |
| else: | |
| logging.error(f"删除消息失败: {e}") |