import os import logging import psycopg2 from psycopg2 import pool from urllib.parse import urlparse from dotenv import load_dotenv import atexit import time load_dotenv() supabase_text = os.environ.get("SUPABASE_TEXT") if not supabase_text: logging.error("未设置 SUPABASE_TEXT 环境变量") exit(1) db_url = urlparse(supabase_text) connection_pool = None initialized = False def initialize_connection_pool(): global connection_pool, initialized if initialized: return True retries = 3 for attempt in range(retries): try: connection_pool = pool.SimpleConnectionPool( minconn=1, maxconn=10, user=db_url.username, password=db_url.password, host=db_url.hostname, port=db_url.port, database=db_url.path[1:] ) logging.info("数据库连接池初始化成功") initialized = True return True except psycopg2.OperationalError as e: logging.error(f"数据库连接池初始化失败 (尝试 {attempt + 1}/{retries}): {e}") logging.exception(e) if attempt < retries - 1: time.sleep(5) else: return False def get_db_connection(): global connection_pool, initialized if not initialized and not initialize_connection_pool(): return None try: conn = connection_pool.getconn() if conn: with conn.cursor() as cursor: cursor.execute("SELECT 1") return conn except psycopg2.pool.PoolError as e: logging.error(f"获取数据库连接失败: {e}") logging.info("尝试重新初始化连接池...") initialized = False if initialize_connection_pool(): return get_db_connection() else: return None except psycopg2.OperationalError as e: logging.error(f"接通时操作失误: {e}") return None def release_db_connection(conn): global connection_pool if conn and connection_pool and not conn.closed: try: connection_pool.putconn(conn) except psycopg2.pool.PoolError as e: logging.error(f"释放数据库连接失败: {e}") conn.close() else: if not conn: logging.warning("释放数据库连接失败: 连接无效") elif not connection_pool: logging.warning("释放数据库连接失败: 连接池无效") elif conn.closed: logging.warning("释放数据库连接失败: 连接已关闭") def create_tables(): conn = get_db_connection() if not conn: return try: with conn.cursor() as cursor: cursor.execute(""" CREATE TABLE IF NOT EXISTS user_configs ( user_id BIGINT, chat_id BIGINT, ai_api TEXT, ai_model TEXT, ai_key TEXT, ai_prompt TEXT, max_token INTEGER, message_delete_delay INTEGER, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (user_id, chat_id) ); CREATE TABLE IF NOT EXISTS user_sessions ( session_key TEXT PRIMARY KEY, messages JSONB, message_ids JSONB, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS bot_messages ( chat_id BIGINT, message_id BIGINT, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (chat_id, message_id) ); """) conn.commit() logging.info("数据库表创建成功") except Exception as e: logging.error(f"创建数据库表失败: {e}") finally: release_db_connection(conn) def reset_database(chat_id, user_id, call, bot, send_bot_message): conn = get_db_connection() if not conn: logging.error("获取数据库连接失败,无法重置数据库") return try: with conn.cursor() as cursor: cursor.execute("SET statement_timeout = 60000;") cursor.execute("DROP TABLE IF EXISTS user_configs CASCADE;") cursor.execute("DROP TABLE IF EXISTS user_sessions CASCADE;") cursor.execute("DROP TABLE IF EXISTS bot_messages CASCADE;") conn.commit() logging.info(f"用户 {user_id} 已删除所有数据库表") create_tables() logging.info(f"用户 {user_id} 已重新创建数据库表") bot.answer_callback_query(call.id, "数据库已重置,所有数据已清除") send_bot_message(bot, chat_id, "数据库已重置,所有数据已清除", parse_mode="MarkdownV2") logging.info(f"用户 {user_id} 重置了数据库,聊天 ID: {chat_id}") except Exception as e: logging.error(f"重置数据库失败: {e}") bot.answer_callback_query(call.id, f"重置数据库失败: {e}") finally: release_db_connection(conn) @atexit.register def close_connection_pool(): global connection_pool if connection_pool: connection_pool.closeall() logging.info("数据库连接池已关闭") if __name__ == "__main__": if initialize_connection_pool(): create_tables()