File size: 5,654 Bytes
4ffd549
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43e4c76
4ffd549
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import os
import logging
import psycopg2
from psycopg2 import pool
from urllib.parse import urlparse
from dotenv import load_dotenv
import atexit
import time

load_dotenv()

supabase_text = os.environ.get("SUPABASE_TEXT")
if not supabase_text:
    logging.error("未设置 SUPABASE_TEXT 环境变量")
    exit(1)

db_url = urlparse(supabase_text)

connection_pool = None
initialized = False

def initialize_connection_pool():
    global connection_pool, initialized
    if initialized:
        return True

    retries = 3
    for attempt in range(retries):
        try:
            connection_pool = pool.SimpleConnectionPool(
                minconn=1,
                maxconn=10,
                user=db_url.username,
                password=db_url.password,
                host=db_url.hostname,
                port=db_url.port,
                database=db_url.path[1:]
            )
            logging.info("数据库连接池初始化成功")
            initialized = True
            return True
        except psycopg2.OperationalError as e:
            logging.error(f"数据库连接池初始化失败 (尝试 {attempt + 1}/{retries}): {e}")
            logging.exception(e)
            if attempt < retries - 1:
                time.sleep(5)
            else:
                return False

def get_db_connection():
    global connection_pool, initialized
    if not initialized and not initialize_connection_pool():
        return None

    try:
        conn = connection_pool.getconn()
        if conn:
            with conn.cursor() as cursor:
                cursor.execute("SELECT 1")
            return conn
    except psycopg2.pool.PoolError as e:
        logging.error(f"获取数据库连接失败: {e}")
        logging.info("尝试重新初始化连接池...")
        initialized = False
        if initialize_connection_pool():
            return get_db_connection()
        else:
            return None
    except psycopg2.OperationalError as e:
        logging.error(f"接通时操作失误: {e}")
        return None

def release_db_connection(conn):
    global connection_pool
    if conn and connection_pool and not conn.closed:
        try:
            connection_pool.putconn(conn)
        except psycopg2.pool.PoolError as e:
            logging.error(f"释放数据库连接失败: {e}")
            conn.close()
    else:
        if not conn:
            logging.warning("释放数据库连接失败: 连接无效")
        elif not connection_pool:
            logging.warning("释放数据库连接失败: 连接池无效")
        elif conn.closed:
            logging.warning("释放数据库连接失败: 连接已关闭")

def create_tables():
    conn = get_db_connection()
    if not conn:
        return

    try:
        with conn.cursor() as cursor:
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS user_configs (
                    user_id BIGINT,
                    chat_id BIGINT,
                    ai_api TEXT,
                    ai_model TEXT,
                    ai_key TEXT,
                    ai_prompt TEXT,
                    max_token INTEGER,
                    message_delete_delay INTEGER,
                    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
                    PRIMARY KEY (user_id, chat_id)
                );
                CREATE TABLE IF NOT EXISTS user_sessions (
                    session_key TEXT PRIMARY KEY,
                    messages JSONB,
                    message_ids JSONB,
                    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
                );
                CREATE TABLE IF NOT EXISTS bot_messages (
                    chat_id BIGINT,
                    message_id BIGINT,
                    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
                    PRIMARY KEY (chat_id, message_id)
                );
            """)
            conn.commit()
            logging.info("数据库表创建成功")
    except Exception as e:
        logging.error(f"创建数据库表失败: {e}")
    finally:
        release_db_connection(conn)

def reset_database(chat_id, user_id, call, bot, send_bot_message):
    conn = get_db_connection()
    if not conn:
        logging.error("获取数据库连接失败,无法重置数据库")
        return

    try:
        with conn.cursor() as cursor:
            cursor.execute("SET statement_timeout = 60000;")
            cursor.execute("DROP TABLE IF EXISTS user_configs CASCADE;")
            cursor.execute("DROP TABLE IF EXISTS user_sessions CASCADE;")
            cursor.execute("DROP TABLE IF EXISTS bot_messages CASCADE;")
            conn.commit()
            logging.info(f"用户 {user_id} 已删除所有数据库表")

            create_tables()
            logging.info(f"用户 {user_id} 已重新创建数据库表")

            bot.answer_callback_query(call.id, "数据库已重置,所有数据已清除")
            send_bot_message(bot, chat_id, "数据库已重置,所有数据已清除", parse_mode="MarkdownV2")
            logging.info(f"用户 {user_id} 重置了数据库,聊天 ID: {chat_id}")
    except Exception as e:
        logging.error(f"重置数据库失败: {e}")
        bot.answer_callback_query(call.id, f"重置数据库失败: {e}")
    finally:
        release_db_connection(conn)

@atexit.register
def close_connection_pool():
    global connection_pool
    if connection_pool:
        connection_pool.closeall()
        logging.info("数据库连接池已关闭")

if __name__ == "__main__":
    if initialize_connection_pool():
        create_tables()