Buckets:
| import pkgutil | |
| from importlib import import_module | |
| from django.conf import settings | |
| from django.core.exceptions import ImproperlyConfigured | |
| # For backwards compatibility with Django < 3.2 | |
| from django.utils.connection import ConnectionDoesNotExist # NOQA: F401 | |
| from django.utils.connection import BaseConnectionHandler | |
| from django.utils.functional import cached_property | |
| from django.utils.module_loading import import_string | |
| DEFAULT_DB_ALIAS = "default" | |
| DJANGO_VERSION_PICKLE_KEY = "_django_version" | |
| class Error(Exception): | |
| pass | |
| class InterfaceError(Error): | |
| pass | |
| class DatabaseError(Error): | |
| pass | |
| class DataError(DatabaseError): | |
| pass | |
| class OperationalError(DatabaseError): | |
| pass | |
| class IntegrityError(DatabaseError): | |
| pass | |
| class InternalError(DatabaseError): | |
| pass | |
| class ProgrammingError(DatabaseError): | |
| pass | |
| class NotSupportedError(DatabaseError): | |
| pass | |
| class DatabaseErrorWrapper: | |
| """ | |
| Context manager and decorator that reraises backend-specific database | |
| exceptions using Django's common wrappers. | |
| """ | |
| def __init__(self, wrapper): | |
| """ | |
| wrapper is a database wrapper. | |
| It must have a Database attribute defining PEP-249 exceptions. | |
| """ | |
| self.wrapper = wrapper | |
| def __del__(self): | |
| del self.wrapper | |
| def __enter__(self): | |
| pass | |
| def __exit__(self, exc_type, exc_value, traceback): | |
| if exc_type is None: | |
| return | |
| for dj_exc_type in ( | |
| DataError, | |
| OperationalError, | |
| IntegrityError, | |
| InternalError, | |
| ProgrammingError, | |
| NotSupportedError, | |
| DatabaseError, | |
| InterfaceError, | |
| Error, | |
| ): | |
| db_exc_type = getattr(self.wrapper.Database, dj_exc_type.__name__) | |
| if issubclass(exc_type, db_exc_type): | |
| dj_exc_value = dj_exc_type(*exc_value.args) | |
| # Only set the 'errors_occurred' flag for errors that may make | |
| # the connection unusable. | |
| if dj_exc_type not in (DataError, IntegrityError): | |
| self.wrapper.errors_occurred = True | |
| raise dj_exc_value.with_traceback(traceback) from exc_value | |
| def __call__(self, func): | |
| # Note that we are intentionally not using @wraps here for performance | |
| # reasons. Refs #21109. | |
| def inner(*args, **kwargs): | |
| with self: | |
| return func(*args, **kwargs) | |
| return inner | |
| def load_backend(backend_name): | |
| """ | |
| Return a database backend's "base" module given a fully qualified database | |
| backend name, or raise an error if it doesn't exist. | |
| """ | |
| # This backend was renamed in Django 1.9. | |
| if backend_name == "django.db.backends.postgresql_psycopg2": | |
| backend_name = "django.db.backends.postgresql" | |
| try: | |
| return import_module("%s.base" % backend_name) | |
| except ImportError as e_user: | |
| # The database backend wasn't found. Display a helpful error message | |
| # listing all built-in database backends. | |
| import django.db.backends | |
| builtin_backends = [ | |
| name | |
| for _, name, ispkg in pkgutil.iter_modules(django.db.backends.__path__) | |
| if ispkg and name not in {"base", "dummy"} | |
| ] | |
| if backend_name not in ["django.db.backends.%s" % b for b in builtin_backends]: | |
| backend_reprs = map(repr, sorted(builtin_backends)) | |
| raise ImproperlyConfigured( | |
| "%r isn't an available database backend or couldn't be " | |
| "imported. Check the above exception. To use one of the " | |
| "built-in backends, use 'django.db.backends.XXX', where XXX " | |
| "is one of:\n" | |
| " %s" % (backend_name, ", ".join(backend_reprs)) | |
| ) from e_user | |
| else: | |
| # If there's some other error, this must be an error in Django | |
| raise | |
| class ConnectionHandler(BaseConnectionHandler): | |
| settings_name = "DATABASES" | |
| # Connections needs to still be an actual thread local, as it's truly | |
| # thread-critical. Database backends should use @async_unsafe to protect | |
| # their code from async contexts, but this will give those contexts | |
| # separate connections in case it's needed as well. There's no cleanup | |
| # after async contexts, though, so we don't allow that if we can help it. | |
| thread_critical = True | |
| def configure_settings(self, databases): | |
| databases = super().configure_settings(databases) | |
| if databases == {}: | |
| databases[DEFAULT_DB_ALIAS] = {"ENGINE": "django.db.backends.dummy"} | |
| elif DEFAULT_DB_ALIAS not in databases: | |
| raise ImproperlyConfigured( | |
| f"You must define a '{DEFAULT_DB_ALIAS}' database." | |
| ) | |
| elif databases[DEFAULT_DB_ALIAS] == {}: | |
| databases[DEFAULT_DB_ALIAS]["ENGINE"] = "django.db.backends.dummy" | |
| # Configure default settings. | |
| for conn in databases.values(): | |
| conn.setdefault("ATOMIC_REQUESTS", False) | |
| conn.setdefault("AUTOCOMMIT", True) | |
| conn.setdefault("ENGINE", "django.db.backends.dummy") | |
| if conn["ENGINE"] == "django.db.backends." or not conn["ENGINE"]: | |
| conn["ENGINE"] = "django.db.backends.dummy" | |
| conn.setdefault("CONN_MAX_AGE", 0) | |
| conn.setdefault("CONN_HEALTH_CHECKS", False) | |
| conn.setdefault("OPTIONS", {}) | |
| conn.setdefault("TIME_ZONE", None) | |
| for setting in ["NAME", "USER", "PASSWORD", "HOST", "PORT"]: | |
| conn.setdefault(setting, "") | |
| test_settings = conn.setdefault("TEST", {}) | |
| default_test_settings = [ | |
| ("CHARSET", None), | |
| ("COLLATION", None), | |
| ("MIGRATE", True), | |
| ("MIRROR", None), | |
| ("NAME", None), | |
| ] | |
| for key, value in default_test_settings: | |
| test_settings.setdefault(key, value) | |
| return databases | |
| def databases(self): | |
| # Maintained for backward compatibility as some 3rd party packages have | |
| # made use of this private API in the past. It is no longer used within | |
| # Django itself. | |
| return self.settings | |
| def create_connection(self, alias): | |
| db = self.settings[alias] | |
| backend = load_backend(db["ENGINE"]) | |
| return backend.DatabaseWrapper(db, alias) | |
| class ConnectionRouter: | |
| def __init__(self, routers=None): | |
| """ | |
| If routers is not specified, default to settings.DATABASE_ROUTERS. | |
| """ | |
| self._routers = routers | |
| def routers(self): | |
| if self._routers is None: | |
| self._routers = settings.DATABASE_ROUTERS | |
| routers = [] | |
| for r in self._routers: | |
| if isinstance(r, str): | |
| router = import_string(r)() | |
| else: | |
| router = r | |
| routers.append(router) | |
| return routers | |
| def _router_func(action): | |
| def _route_db(self, model, **hints): | |
| chosen_db = None | |
| for router in self.routers: | |
| try: | |
| method = getattr(router, action) | |
| except AttributeError: | |
| # If the router doesn't have a method, skip to the next | |
| # one. | |
| pass | |
| else: | |
| chosen_db = method(model, **hints) | |
| if chosen_db: | |
| return chosen_db | |
| instance = hints.get("instance") | |
| if instance is not None and instance._state.db: | |
| return instance._state.db | |
| return DEFAULT_DB_ALIAS | |
| return _route_db | |
| db_for_read = _router_func("db_for_read") | |
| db_for_write = _router_func("db_for_write") | |
| def allow_relation(self, obj1, obj2, **hints): | |
| for router in self.routers: | |
| try: | |
| method = router.allow_relation | |
| except AttributeError: | |
| # If the router doesn't have a method, skip to the next one. | |
| pass | |
| else: | |
| allow = method(obj1, obj2, **hints) | |
| if allow is not None: | |
| return allow | |
| return obj1._state.db == obj2._state.db | |
| def allow_migrate(self, db, app_label, **hints): | |
| for router in self.routers: | |
| try: | |
| method = router.allow_migrate | |
| except AttributeError: | |
| # If the router doesn't have a method, skip to the next one. | |
| continue | |
| allow = method(db, app_label, **hints) | |
| if allow is not None: | |
| return allow | |
| return True | |
| def allow_migrate_model(self, db, model): | |
| return self.allow_migrate( | |
| db, | |
| model._meta.app_label, | |
| model_name=model._meta.model_name, | |
| model=model, | |
| ) | |
| def get_migratable_models(self, app_config, db, include_auto_created=False): | |
| """Return app models allowed to be migrated on provided db.""" | |
| models = app_config.get_models(include_auto_created=include_auto_created) | |
| return [model for model in models if self.allow_migrate_model(db, model)] | |
Xet Storage Details
- Size:
- 9.35 kB
- Xet hash:
- cc1270c3b47318bb4261d3a4a0793224cbfe47e31d4e7a2835afd1d1bfede448
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.