File size: 13,034 Bytes
4ffd549
 
 
 
 
 
 
 
 
 
49e42ce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ffd549
49e42ce
 
 
4ffd549
 
 
 
 
 
 
 
 
49e42ce
 
 
 
 
 
4ffd549
 
 
 
 
 
 
49e42ce
 
 
 
 
 
 
 
4ffd549
cd182cf
49e42ce
 
 
 
 
 
 
cd182cf
 
49e42ce
 
 
 
 
 
 
4ffd549
49e42ce
 
 
 
 
 
cd182cf
4ffd549
cd182cf
 
 
4ffd549
 
cd182cf
4ffd549
 
 
49e42ce
 
 
4ffd549
 
 
 
 
 
 
 
 
 
 
 
49e42ce
 
 
 
 
 
 
 
 
 
 
4ffd549
 
 
 
 
 
 
 
 
 
49e42ce
 
 
 
 
4ffd549
 
49e42ce
 
 
4ffd549
 
 
49e42ce
 
 
4ffd549
 
 
49e42ce
 
 
 
 
 
4ffd549
 
 
 
 
 
 
 
49e42ce
 
 
4ffd549
49e42ce
 
 
 
 
 
 
 
4ffd549
49e42ce
4ffd549
49e42ce
 
 
 
 
 
4ffd549
 
 
 
 
49e42ce
 
 
 
4ffd549
49e42ce
 
 
 
 
4ffd549
 
 
 
 
49e42ce
 
 
 
 
 
4ffd549
 
49e42ce
 
 
 
 
 
 
4ffd549
 
 
49e42ce
 
 
4ffd549
49e42ce
 
4ffd549
 
49e42ce
 
 
4ffd549
 
 
 
 
 
 
 
 
49e42ce
 
 
 
4ffd549
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49e42ce
 
4ffd549
 
 
 
49e42ce
 
 
4ffd549
 
 
 
 
 
49e42ce
 
 
 
 
4ffd549
 
49e42ce
 
 
 
 
 
4ffd549
49e42ce
 
 
4ffd549
 
 
 
 
 
 
 
 
 
 
 
 
 
49e42ce
 
 
 
4ffd549
 
 
49e42ce
 
 
 
4ffd549
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import time
import logging
import json
import requests
import telegramify_markdown
from telebot.apihelper import ApiTelegramException
from db_utils import get_db_connection, release_db_connection
from telebot import types

def call_ai_stream_api(api_url, model, messages, api_key):
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {api_key}',
    }
    api_url = (
        api_url.rstrip("/") + "/chat/completions"
        if not api_url.endswith("/chat/completions")
        else api_url
    )
    data = {
        "messages": messages,
        "model": model,
        "stream": True,
        "temperature": 1.0,
    }
    try:
        with requests.post(
            api_url, json=data, headers=headers, stream=True
        ) as response:
            response.raise_for_status()
            for line in response.iter_lines():
                if line:
                    decoded_line = line.decode('utf-8').replace("data: ", "")
                    if decoded_line.strip() == "[DONE]":
                        break
                    try:
                        json_data = json.loads(decoded_line)
                        if choices := json_data.get("choices"):
                            if (
                                content := choices[0]
                                .get("delta", {})
                                .get("content")
                                or choices[0].get("message", {}).get("content")
                            ):
                                yield content
                    except json.JSONDecodeError as e:
                        logging.error(f"JSON 解码失败: {e},数据: {decoded_line}")
    except requests.exceptions.RequestException as e:
        logging.error(f"AI API 请求失败: {e}")
        raise

def send_bot_message(
    bot,
    chat_id,
    message,
    parse_mode='MarkdownV2',
    disable_web_page_preview=None,
    reply_to_message_id=None,
):
    try:
        if parse_mode == "HTML":
            sent_message = bot.send_message(
                chat_id,
                message,
                parse_mode="HTML",
                disable_web_page_preview=disable_web_page_preview,
                reply_to_message_id=reply_to_message_id,
            )
        elif parse_mode == "MarkdownV2":
            escaped_message = telegramify_markdown.markdownify(message)
            sent_message = bot.send_message(
                chat_id,
                escaped_message,
                parse_mode='MarkdownV2',
                disable_web_page_preview=disable_web_page_preview,
                reply_to_message_id=reply_to_message_id,
            )
        else:
            sent_message = bot.send_message(
                chat_id,
                message,
                disable_web_page_preview=disable_web_page_preview,
                reply_to_message_id=reply_to_message_id,
            )
        return sent_message
    except ApiTelegramException as e:
        error_message = str(e)
        logging.error(f"发送消息到聊天 {chat_id} 失败: {error_message}")
        bot.send_message(chat_id, f"错误: {error_message}", parse_mode='MarkdownV2')
    except Exception as e:
        logging.error(f"发送消息到聊天 {chat_id} 失败: {e}")
        bot.send_message(chat_id, f"错误: {e}", parse_mode='MarkdownV2')

def edit_bot_message(bot, chat_id, message_id, text, parse_mode='MarkdownV2'):
    try:
        bot.edit_message_text(
            chat_id=chat_id, message_id=message_id, text=text, parse_mode=parse_mode
        )
        return True
    except ApiTelegramException as e:
        if e.error_code == 429:
            retry_after = int(e.description.split()[-1])
            logging.warning(f"达到速率限制,将在 {retry_after} 秒后重试")
            time.sleep(retry_after)
        elif e.error_code == 400 and "message is not modified" in e.description:
            logging.warning("消息内容未更改,跳过更新")
        else:
            logging.error(f"更新消息失败: {e}")
    return False

def process_stream_response(
    bot, user_id, chat_id, reply_to_message_id, session_key, config
):
    (
        base_delay,
        max_delay,
        reset_threshold,
        delay,
        retry_count,
        success_count,
    ) = (0.5, 5, 5, 0.5, 0, 0)
    full_response = ""
    message_id = None
    last_edit_time = time.time()
    conn = get_db_connection()
    if not conn:
        return

    try:
        with conn.cursor() as cursor:
            session = get_session_data(cursor, session_key)
            final_messages = (
                [{"role": "system", "content": config.get("ai_prompt")}]
                if config.get("ai_prompt")
                else []
            )
            final_messages.extend(session["messages"])

            for content_chunk in call_ai_stream_api(
                config["ai_api"], config["ai_model"], final_messages, config["ai_key"]
            ):
                full_response += content_chunk
                if content_chunk:
                    if message_id is None:
                        message_id = send_initial_message(
                            bot, chat_id, full_response, reply_to_message_id
                        )
                        if not message_id:
                            return
                    elif time.time() - last_edit_time >= 1:
                        if not edit_bot_message(
                            bot,
                            chat_id,
                            message_id,
                            telegramify_markdown.markdownify(full_response),
                        ):
                            continue
                        last_edit_time = time.time()
                        time.sleep(delay)
                        delay, success_count = base_delay, success_count + 1
                        if success_count >= reset_threshold:
                            retry_count, success_count = 0, 0

            if message_id:
                edit_bot_message(
                    bot, chat_id, message_id, telegramify_markdown.markdownify(full_response)
                )

            update_session_data(
                cursor,
                session_key,
                session,
                full_response,
                message_id,
                config.get("max_token"),
            )
            conn.commit()
            logging.info(f"消息 {message_id} 已添加到删除队列,聊天 ID: {chat_id}")
    except Exception as e:
        send_bot_message(
            bot,
            chat_id,
            f"*请求错误*\n`{str(e)}`",
            reply_to_message_id=reply_to_message_id,
        )
        logging.error(f"处理流式响应失败: {e}, 用户ID: {user_id}, 聊天ID: {chat_id}")
    finally:
        release_db_connection(conn)

def get_session_data(cursor, session_key):
    cursor.execute(
        "SELECT messages, message_ids FROM user_sessions WHERE session_key = %s",
        (session_key,),
    )
    session_row = cursor.fetchone()
    return (
        {"messages": session_row[0], "message_ids": session_row[1]}
        if session_row
        else {"messages": [], "message_ids": []}
    )

def update_session_data(cursor, session_key, session, full_response, message_id, max_token):
    session["messages"].append({"role": "assistant", "content": full_response})
    session["message_ids"].append(message_id)
    if max_token and len(session["messages"]) > max_token * 2:
        session["message_ids"], session["messages"] = (
            session["message_ids"][2:],
            session["messages"][2:],
        )
    cursor.execute(
        """
        INSERT INTO user_sessions (session_key, messages, message_ids) VALUES (%s, %s, %s)
        ON CONFLICT (session_key) DO UPDATE SET messages = EXCLUDED.messages, message_ids = EXCLUDED.message_ids
    """,
        (
            session_key,
            json.dumps(session["messages"]),
            json.dumps(session["message_ids"]),
        ),
    )

def send_initial_message(bot, chat_id, full_response, reply_to_message_id):
    escaped_text = telegramify_markdown.markdownify(full_response)
    sent_message = send_bot_message(
        bot, chat_id, escaped_text, reply_to_message_id=reply_to_message_id
    )
    if sent_message:
        add_message_to_delete_queue(chat_id, sent_message.message_id)
        logging.info(f"已发送初始消息 {sent_message.message_id} 到聊天 {chat_id},并添加到删除队列")
        return sent_message.message_id
    else:
        send_bot_message(
            bot, chat_id, "*请求错误*\n`初始消息发送失败`", reply_to_message_id=reply_to_message_id
        )
        logging.error(f"初始消息发送失败,聊天ID: {chat_id}")
        return None

def add_message_to_delete_queue(chat_id, message_id):
    conn = get_db_connection()
    if not conn:
        return
    try:
        with conn.cursor() as cursor:
            cursor.execute(
                "INSERT INTO bot_messages (chat_id, message_id) VALUES (%s, %s) ON CONFLICT (chat_id, message_id) DO NOTHING",
                (chat_id, message_id),
            )
            conn.commit()
            logging.info(f"消息 {message_id} 已添加到删除队列,聊天 ID: {chat_id}")
    except Exception as e:
        logging.error(f"将消息 {message_id} 添加到删除队列失败: {e}")
    finally:
        release_db_connection(conn)

def delete_bot_messages(bot):
    while True:
        conn = None
        sleep_time = 30
        try:
            conn = get_db_connection()
            if conn:
                with conn.cursor() as cursor:
                    cursor.execute("SELECT MIN(message_delete_delay) FROM user_configs")
                    min_delay = cursor.fetchone()[0]
                    if min_delay is None:
                        min_delay = 60
                        sleep_time = 30
                    else:
                        sleep_time = max(min_delay // 2, 1)

                    cursor.execute(
                        """
                        SELECT bot_messages.chat_id, bot_messages.message_id, user_configs.message_delete_delay 
                        FROM bot_messages 
                        JOIN user_configs ON bot_messages.chat_id = user_configs.chat_id 
                        WHERE bot_messages.created_at <= current_timestamp - interval '%s seconds'
                    """,
                        (min_delay,),
                    )
                    messages_to_delete = cursor.fetchall()

                    for chat_id, message_id, delay in messages_to_delete:
                        try:
                            if delay is not None:
                                bot.delete_message(chat_id, message_id)
                                logging.info(f"已删除消息 {message_id},聊天 ID: {chat_id}")
                                cursor.execute(
                                    "DELETE FROM bot_messages WHERE chat_id = %s AND message_id = %s",
                                    (chat_id, message_id),
                                )
                                conn.commit()
                        except ApiTelegramException as e:
                            logging.error(
                                f"删除消息 {message_id} 失败,聊天 ID: {chat_id},错误: {e}"
                            )
                            handle_delete_message_error(
                                cursor, conn, chat_id, message_id, e
                            )
                        except Exception as e:
                            logging.error(
                                f"处理消息 {message_id} 时发生未知错误,聊天 ID: {chat_id},错误: {e}"
                            )
            else:
                logging.error("获取数据库连接失败")
                sleep_time = 60
        except Exception as e:
            logging.error(f"删除消息时发生错误: {e}")
            sleep_time = 60
        finally:
            if conn:
                release_db_connection(conn)
            time.sleep(sleep_time)

def handle_delete_message_error(cursor, conn, chat_id, message_id, e):
    if e.error_code == 400 and "message to delete not found" in e.description:
        logging.warning(f"消息 {message_id} 在聊天 {chat_id} 中未找到,已从数据库中移除")
        cursor.execute(
            "DELETE FROM bot_messages WHERE chat_id = %s AND message_id = %s",
            (chat_id, message_id),
        )
        conn.commit()
    elif e.error_code == 400 and "message can't be deleted for everyone" in e.description:
        logging.warning(f"消息 {message_id} 无法删除,可能已过期或没有权限")
        cursor.execute(
            "DELETE FROM bot_messages WHERE chat_id = %s AND message_id = %s",
            (chat_id, message_id),
        )
        conn.commit()
    else:
        logging.error(f"删除消息失败: {e}")