| | import os |
| | import logging |
| | import psycopg2 |
| | from psycopg2 import pool |
| | from urllib.parse import urlparse |
| | from dotenv import load_dotenv |
| | import atexit |
| | import time |
| |
|
| | load_dotenv() |
| |
|
| | supabase_text = os.environ.get("SUPABASE_TEXT") |
| | if not supabase_text: |
| | logging.error("未设置 SUPABASE_TEXT 环境变量") |
| | exit(1) |
| |
|
| | db_url = urlparse(supabase_text) |
| |
|
| | connection_pool = None |
| | initialized = False |
| |
|
| | def initialize_connection_pool(): |
| | global connection_pool, initialized |
| | if initialized: |
| | return True |
| |
|
| | retries = 3 |
| | for attempt in range(retries): |
| | try: |
| | connection_pool = pool.SimpleConnectionPool( |
| | minconn=1, |
| | maxconn=10, |
| | user=db_url.username, |
| | password=db_url.password, |
| | host=db_url.hostname, |
| | port=db_url.port, |
| | database=db_url.path[1:] |
| | ) |
| | logging.info("数据库连接池初始化成功") |
| | initialized = True |
| | return True |
| | except psycopg2.OperationalError as e: |
| | logging.error(f"数据库连接池初始化失败 (尝试 {attempt + 1}/{retries}): {e}") |
| | logging.exception(e) |
| | if attempt < retries - 1: |
| | time.sleep(5) |
| | else: |
| | return False |
| |
|
| | def get_db_connection(): |
| | global connection_pool, initialized |
| | if not initialized and not initialize_connection_pool(): |
| | return None |
| |
|
| | try: |
| | conn = connection_pool.getconn() |
| | if conn: |
| | with conn.cursor() as cursor: |
| | cursor.execute("SELECT 1") |
| | return conn |
| | except psycopg2.pool.PoolError as e: |
| | logging.error(f"获取数据库连接失败: {e}") |
| | logging.info("尝试重新初始化连接池...") |
| | initialized = False |
| | if initialize_connection_pool(): |
| | return get_db_connection() |
| | else: |
| | return None |
| | except psycopg2.OperationalError as e: |
| | logging.error(f"接通时操作失误: {e}") |
| | return None |
| |
|
| | def release_db_connection(conn): |
| | global connection_pool |
| | if conn and connection_pool and not conn.closed: |
| | try: |
| | connection_pool.putconn(conn) |
| | except psycopg2.pool.PoolError as e: |
| | logging.error(f"释放数据库连接失败: {e}") |
| | conn.close() |
| | else: |
| | if not conn: |
| | logging.warning("释放数据库连接失败: 连接无效") |
| | elif not connection_pool: |
| | logging.warning("释放数据库连接失败: 连接池无效") |
| | elif conn.closed: |
| | logging.warning("释放数据库连接失败: 连接已关闭") |
| |
|
| | def create_tables(): |
| | conn = get_db_connection() |
| | if not conn: |
| | return |
| |
|
| | try: |
| | with conn.cursor() as cursor: |
| | cursor.execute(""" |
| | CREATE TABLE IF NOT EXISTS user_configs ( |
| | user_id BIGINT, |
| | chat_id BIGINT, |
| | ai_api TEXT, |
| | ai_model TEXT, |
| | ai_key TEXT, |
| | ai_prompt TEXT, |
| | max_token INTEGER, |
| | message_delete_delay INTEGER, |
| | created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, |
| | PRIMARY KEY (user_id, chat_id) |
| | ); |
| | CREATE TABLE IF NOT EXISTS user_sessions ( |
| | session_key TEXT PRIMARY KEY, |
| | messages JSONB, |
| | message_ids JSONB, |
| | created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP |
| | ); |
| | CREATE TABLE IF NOT EXISTS bot_messages ( |
| | chat_id BIGINT, |
| | message_id BIGINT, |
| | created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, |
| | PRIMARY KEY (chat_id, message_id) |
| | ); |
| | """) |
| | conn.commit() |
| | logging.info("数据库表创建成功") |
| | except Exception as e: |
| | logging.error(f"创建数据库表失败: {e}") |
| | finally: |
| | release_db_connection(conn) |
| |
|
| | def reset_database(chat_id, user_id, call, bot, send_bot_message): |
| | conn = get_db_connection() |
| | if not conn: |
| | logging.error("获取数据库连接失败,无法重置数据库") |
| | return |
| |
|
| | try: |
| | with conn.cursor() as cursor: |
| | cursor.execute("SET statement_timeout = 60000;") |
| | cursor.execute("DROP TABLE IF EXISTS user_configs CASCADE;") |
| | cursor.execute("DROP TABLE IF EXISTS user_sessions CASCADE;") |
| | cursor.execute("DROP TABLE IF EXISTS bot_messages CASCADE;") |
| | conn.commit() |
| | logging.info(f"用户 {user_id} 已删除所有数据库表") |
| |
|
| | create_tables() |
| | logging.info(f"用户 {user_id} 已重新创建数据库表") |
| |
|
| | bot.answer_callback_query(call.id, "数据库已重置,所有数据已清除") |
| | send_bot_message(bot, chat_id, "数据库已重置,所有数据已清除", parse_mode="MarkdownV2") |
| | logging.info(f"用户 {user_id} 重置了数据库,聊天 ID: {chat_id}") |
| | except Exception as e: |
| | logging.error(f"重置数据库失败: {e}") |
| | bot.answer_callback_query(call.id, f"重置数据库失败: {e}") |
| | finally: |
| | release_db_connection(conn) |
| |
|
| | @atexit.register |
| | def close_connection_pool(): |
| | global connection_pool |
| | if connection_pool: |
| | connection_pool.closeall() |
| | logging.info("数据库连接池已关闭") |
| |
|
| | if __name__ == "__main__": |
| | if initialize_connection_pool(): |
| | create_tables() |