Spaces:
Paused
Paused
File size: 5,168 Bytes
55bd140 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
import os
import json
import logging
from contextlib import contextmanager
from typing import Any, Optional
from open_webui.internal.wrappers import register_connection
from open_webui.env import (
OPEN_WEBUI_DIR,
DATABASE_URL,
DATABASE_SCHEMA,
SRC_LOG_LEVELS,
DATABASE_POOL_MAX_OVERFLOW,
DATABASE_POOL_RECYCLE,
DATABASE_POOL_SIZE,
DATABASE_POOL_TIMEOUT,
DATABASE_ENABLE_SQLITE_WAL,
)
from peewee_migrate import Router
from sqlalchemy import Dialect, create_engine, MetaData, event, types
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import QueuePool, NullPool
from sqlalchemy.sql.type_api import _T
from typing_extensions import Self
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["DB"])
class JSONField(types.TypeDecorator):
impl = types.Text
cache_ok = True
def process_bind_param(self, value: Optional[_T], dialect: Dialect) -> Any:
return json.dumps(value)
def process_result_value(self, value: Optional[_T], dialect: Dialect) -> Any:
if value is not None:
return json.loads(value)
def copy(self, **kw: Any) -> Self:
return JSONField(self.impl.length)
def db_value(self, value):
return json.dumps(value)
def python_value(self, value):
if value is not None:
return json.loads(value)
# Workaround to handle the peewee migration
# This is required to ensure the peewee migration is handled before the alembic migration
def handle_peewee_migration(DATABASE_URL):
# db = None
try:
# Replace the postgresql:// with postgres:// to handle the peewee migration
db = register_connection(DATABASE_URL.replace("postgresql://", "postgres://"))
migrate_dir = OPEN_WEBUI_DIR / "internal" / "migrations"
router = Router(db, logger=log, migrate_dir=migrate_dir)
router.run()
db.close()
except Exception as e:
log.error(f"Failed to initialize the database connection: {e}")
log.warning(
"Hint: If your database password contains special characters, you may need to URL-encode it."
)
raise
finally:
# Properly closing the database connection
if db and not db.is_closed():
db.close()
# Assert if db connection has been closed
assert db.is_closed(), "Database connection is still open."
handle_peewee_migration(DATABASE_URL)
SQLALCHEMY_DATABASE_URL = DATABASE_URL
# Handle SQLCipher URLs
if SQLALCHEMY_DATABASE_URL.startswith("sqlite+sqlcipher://"):
database_password = os.environ.get("DATABASE_PASSWORD")
if not database_password or database_password.strip() == "":
raise ValueError(
"DATABASE_PASSWORD is required when using sqlite+sqlcipher:// URLs"
)
# Extract database path from SQLCipher URL
db_path = SQLALCHEMY_DATABASE_URL.replace("sqlite+sqlcipher://", "")
if db_path.startswith("/"):
db_path = db_path[1:] # Remove leading slash for relative paths
# Create a custom creator function that uses sqlcipher3
def create_sqlcipher_connection():
import sqlcipher3
conn = sqlcipher3.connect(db_path, check_same_thread=False)
conn.execute(f"PRAGMA key = '{database_password}'")
return conn
engine = create_engine(
"sqlite://", # Dummy URL since we're using creator
creator=create_sqlcipher_connection,
echo=False,
)
log.info("Connected to encrypted SQLite database using SQLCipher")
elif "sqlite" in SQLALCHEMY_DATABASE_URL:
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
def on_connect(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
if DATABASE_ENABLE_SQLITE_WAL:
cursor.execute("PRAGMA journal_mode=WAL")
else:
cursor.execute("PRAGMA journal_mode=DELETE")
cursor.close()
event.listen(engine, "connect", on_connect)
else:
if isinstance(DATABASE_POOL_SIZE, int):
if DATABASE_POOL_SIZE > 0:
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
pool_size=DATABASE_POOL_SIZE,
max_overflow=DATABASE_POOL_MAX_OVERFLOW,
pool_timeout=DATABASE_POOL_TIMEOUT,
pool_recycle=DATABASE_POOL_RECYCLE,
pool_pre_ping=True,
poolclass=QueuePool,
)
else:
engine = create_engine(
SQLALCHEMY_DATABASE_URL, pool_pre_ping=True, poolclass=NullPool
)
else:
engine = create_engine(SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(
autocommit=False, autoflush=False, bind=engine, expire_on_commit=False
)
metadata_obj = MetaData(schema=DATABASE_SCHEMA)
Base = declarative_base(metadata=metadata_obj)
Session = scoped_session(SessionLocal)
def get_session():
db = SessionLocal()
try:
yield db
finally:
db.close()
get_db = contextmanager(get_session)
|