File size: 4,065 Bytes
85494ee
a0ab3de
 
 
85494ee
a0ab3de
 
 
 
85494ee
a0ab3de
 
85494ee
 
a0ab3de
 
66bebb6
85494ee
 
a0ab3de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85494ee
 
 
a0ab3de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85494ee
a0ab3de
 
 
 
 
 
 
 
 
 
 
 
 
 
85494ee
 
a0ab3de
85494ee
 
 
a0ab3de
 
 
 
85494ee
a0ab3de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# database_sql.py
# ==========================================
# 🛡️ 稳定性优化:SQL 数据库连接模块
# ==========================================
import os
import time
import logging
from functools import wraps
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool, NullPool
from sqlalchemy.exc import OperationalError, InterfaceError, DBAPIError
from models_sql import Base

logger = logging.getLogger("ComfyUI-Ranking.Database")

# 核心:优先读取环境变量中的 PostgreSQL 数据库连接
SQLALCHEMY_DATABASE_URL = os.environ.get("DATABASE_URL", "sqlite:////tmp/comfy_financial.db")

# 🚀 P1性能优化:根据数据库类型配置连接池
if "sqlite" in SQLALCHEMY_DATABASE_URL:
    # SQLite:使用空连接池,单线程模式
    connect_args = {"check_same_thread": False}
    engine = create_engine(
        SQLALCHEMY_DATABASE_URL, 
        connect_args=connect_args,
        poolclass=NullPool  # SQLite 不需要连接池
    )
else:
    # PostgreSQL/MySQL:配置连接池参数
    engine = create_engine(
        SQLALCHEMY_DATABASE_URL,
        poolclass=QueuePool,
        pool_size=5,           # 核心连接数
        max_overflow=10,       # 超出 pool_size 后可创建的最大连接数
        pool_timeout=30,       # 获取连接超时(秒)
        pool_recycle=1800,     # 连接回收时间(30分钟),防止数据库断开
        pool_pre_ping=True     # 使用前检测连接有效性
    )

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


# ==========================================
# 🛡️ 稳定性优化:数据库操作重试装饰器
# ==========================================
def db_retry(max_retries: int = 3, delay: float = 0.5):
    """

    数据库操作重试装饰器

    

    用法:

        @db_retry(max_retries=3)

        def my_db_function(db):

            ...

    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_error = None
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except (OperationalError, InterfaceError, DBAPIError) as e:
                    last_error = e
                    if attempt < max_retries - 1:
                        wait_time = delay * (2 ** attempt)  # 指数退避
                        logger.warning(f"DB 操作失败 (第{attempt+1}次):{e}, {wait_time}秒后重试")
                        time.sleep(wait_time)
                    else:
                        logger.error(f"DB 操作失败 (重试{max_retries}次后放弃): {e}")
            raise last_error
        return wrapper
    return decorator


def init_sql_db():
    """初始化数据库,包含重试机制"""
    for attempt in range(3):
        try:
            Base.metadata.create_all(bind=engine)
            logger.info("数据库初始化成功")
            return
        except Exception as e:
            if attempt < 2:
                logger.warning(f"数据库初始化失败 (第{attempt+1}次): {e}")
                time.sleep(2 ** attempt)
            else:
                logger.error(f"数据库初始化失败 (已重试3次): {e}")
                raise


def get_db():
    """获取数据库会话,带连接有效性检测"""
    db = SessionLocal()
    try:
        yield db
    except (OperationalError, InterfaceError) as e:
        logger.error(f"数据库连接错误: {e}")
        db.rollback()
        raise
    finally:
        db.close()


def check_db_connection() -> bool:
    """

    检查数据库连接是否正常

    用于健康检查接口

    """
    try:
        db = SessionLocal()
        db.execute(text("SELECT 1"))
        db.close()
        return True
    except Exception as e:
        logger.error(f"数据库连接检查失败: {e}")
        return False