aibot / APP /db_utils.py
3v324v23's picture
test
43e4c76
import os
import logging
import psycopg2
from psycopg2 import pool
from urllib.parse import urlparse
from dotenv import load_dotenv
import atexit
import time
load_dotenv()
supabase_text = os.environ.get("SUPABASE_TEXT")
if not supabase_text:
logging.error("未设置 SUPABASE_TEXT 环境变量")
exit(1)
db_url = urlparse(supabase_text)
connection_pool = None
initialized = False
def initialize_connection_pool():
global connection_pool, initialized
if initialized:
return True
retries = 3
for attempt in range(retries):
try:
connection_pool = pool.SimpleConnectionPool(
minconn=1,
maxconn=10,
user=db_url.username,
password=db_url.password,
host=db_url.hostname,
port=db_url.port,
database=db_url.path[1:]
)
logging.info("数据库连接池初始化成功")
initialized = True
return True
except psycopg2.OperationalError as e:
logging.error(f"数据库连接池初始化失败 (尝试 {attempt + 1}/{retries}): {e}")
logging.exception(e)
if attempt < retries - 1:
time.sleep(5)
else:
return False
def get_db_connection():
global connection_pool, initialized
if not initialized and not initialize_connection_pool():
return None
try:
conn = connection_pool.getconn()
if conn:
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
return conn
except psycopg2.pool.PoolError as e:
logging.error(f"获取数据库连接失败: {e}")
logging.info("尝试重新初始化连接池...")
initialized = False
if initialize_connection_pool():
return get_db_connection()
else:
return None
except psycopg2.OperationalError as e:
logging.error(f"接通时操作失误: {e}")
return None
def release_db_connection(conn):
global connection_pool
if conn and connection_pool and not conn.closed:
try:
connection_pool.putconn(conn)
except psycopg2.pool.PoolError as e:
logging.error(f"释放数据库连接失败: {e}")
conn.close()
else:
if not conn:
logging.warning("释放数据库连接失败: 连接无效")
elif not connection_pool:
logging.warning("释放数据库连接失败: 连接池无效")
elif conn.closed:
logging.warning("释放数据库连接失败: 连接已关闭")
def create_tables():
conn = get_db_connection()
if not conn:
return
try:
with conn.cursor() as cursor:
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_configs (
user_id BIGINT,
chat_id BIGINT,
ai_api TEXT,
ai_model TEXT,
ai_key TEXT,
ai_prompt TEXT,
max_token INTEGER,
message_delete_delay INTEGER,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, chat_id)
);
CREATE TABLE IF NOT EXISTS user_sessions (
session_key TEXT PRIMARY KEY,
messages JSONB,
message_ids JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS bot_messages (
chat_id BIGINT,
message_id BIGINT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (chat_id, message_id)
);
""")
conn.commit()
logging.info("数据库表创建成功")
except Exception as e:
logging.error(f"创建数据库表失败: {e}")
finally:
release_db_connection(conn)
def reset_database(chat_id, user_id, call, bot, send_bot_message):
conn = get_db_connection()
if not conn:
logging.error("获取数据库连接失败,无法重置数据库")
return
try:
with conn.cursor() as cursor:
cursor.execute("SET statement_timeout = 60000;")
cursor.execute("DROP TABLE IF EXISTS user_configs CASCADE;")
cursor.execute("DROP TABLE IF EXISTS user_sessions CASCADE;")
cursor.execute("DROP TABLE IF EXISTS bot_messages CASCADE;")
conn.commit()
logging.info(f"用户 {user_id} 已删除所有数据库表")
create_tables()
logging.info(f"用户 {user_id} 已重新创建数据库表")
bot.answer_callback_query(call.id, "数据库已重置,所有数据已清除")
send_bot_message(bot, chat_id, "数据库已重置,所有数据已清除", parse_mode="MarkdownV2")
logging.info(f"用户 {user_id} 重置了数据库,聊天 ID: {chat_id}")
except Exception as e:
logging.error(f"重置数据库失败: {e}")
bot.answer_callback_query(call.id, f"重置数据库失败: {e}")
finally:
release_db_connection(conn)
@atexit.register
def close_connection_pool():
global connection_pool
if connection_pool:
connection_pool.closeall()
logging.info("数据库连接池已关闭")
if __name__ == "__main__":
if initialize_connection_pool():
create_tables()