File size: 5,654 Bytes
4ffd549 43e4c76 4ffd549 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | import os
import logging
import psycopg2
from psycopg2 import pool
from urllib.parse import urlparse
from dotenv import load_dotenv
import atexit
import time
load_dotenv()
supabase_text = os.environ.get("SUPABASE_TEXT")
if not supabase_text:
logging.error("未设置 SUPABASE_TEXT 环境变量")
exit(1)
db_url = urlparse(supabase_text)
connection_pool = None
initialized = False
def initialize_connection_pool():
global connection_pool, initialized
if initialized:
return True
retries = 3
for attempt in range(retries):
try:
connection_pool = pool.SimpleConnectionPool(
minconn=1,
maxconn=10,
user=db_url.username,
password=db_url.password,
host=db_url.hostname,
port=db_url.port,
database=db_url.path[1:]
)
logging.info("数据库连接池初始化成功")
initialized = True
return True
except psycopg2.OperationalError as e:
logging.error(f"数据库连接池初始化失败 (尝试 {attempt + 1}/{retries}): {e}")
logging.exception(e)
if attempt < retries - 1:
time.sleep(5)
else:
return False
def get_db_connection():
global connection_pool, initialized
if not initialized and not initialize_connection_pool():
return None
try:
conn = connection_pool.getconn()
if conn:
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
return conn
except psycopg2.pool.PoolError as e:
logging.error(f"获取数据库连接失败: {e}")
logging.info("尝试重新初始化连接池...")
initialized = False
if initialize_connection_pool():
return get_db_connection()
else:
return None
except psycopg2.OperationalError as e:
logging.error(f"接通时操作失误: {e}")
return None
def release_db_connection(conn):
global connection_pool
if conn and connection_pool and not conn.closed:
try:
connection_pool.putconn(conn)
except psycopg2.pool.PoolError as e:
logging.error(f"释放数据库连接失败: {e}")
conn.close()
else:
if not conn:
logging.warning("释放数据库连接失败: 连接无效")
elif not connection_pool:
logging.warning("释放数据库连接失败: 连接池无效")
elif conn.closed:
logging.warning("释放数据库连接失败: 连接已关闭")
def create_tables():
conn = get_db_connection()
if not conn:
return
try:
with conn.cursor() as cursor:
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_configs (
user_id BIGINT,
chat_id BIGINT,
ai_api TEXT,
ai_model TEXT,
ai_key TEXT,
ai_prompt TEXT,
max_token INTEGER,
message_delete_delay INTEGER,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, chat_id)
);
CREATE TABLE IF NOT EXISTS user_sessions (
session_key TEXT PRIMARY KEY,
messages JSONB,
message_ids JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS bot_messages (
chat_id BIGINT,
message_id BIGINT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (chat_id, message_id)
);
""")
conn.commit()
logging.info("数据库表创建成功")
except Exception as e:
logging.error(f"创建数据库表失败: {e}")
finally:
release_db_connection(conn)
def reset_database(chat_id, user_id, call, bot, send_bot_message):
conn = get_db_connection()
if not conn:
logging.error("获取数据库连接失败,无法重置数据库")
return
try:
with conn.cursor() as cursor:
cursor.execute("SET statement_timeout = 60000;")
cursor.execute("DROP TABLE IF EXISTS user_configs CASCADE;")
cursor.execute("DROP TABLE IF EXISTS user_sessions CASCADE;")
cursor.execute("DROP TABLE IF EXISTS bot_messages CASCADE;")
conn.commit()
logging.info(f"用户 {user_id} 已删除所有数据库表")
create_tables()
logging.info(f"用户 {user_id} 已重新创建数据库表")
bot.answer_callback_query(call.id, "数据库已重置,所有数据已清除")
send_bot_message(bot, chat_id, "数据库已重置,所有数据已清除", parse_mode="MarkdownV2")
logging.info(f"用户 {user_id} 重置了数据库,聊天 ID: {chat_id}")
except Exception as e:
logging.error(f"重置数据库失败: {e}")
bot.answer_callback_query(call.id, f"重置数据库失败: {e}")
finally:
release_db_connection(conn)
@atexit.register
def close_connection_pool():
global connection_pool
if connection_pool:
connection_pool.closeall()
logging.info("数据库连接池已关闭")
if __name__ == "__main__":
if initialize_connection_pool():
create_tables() |