Buckets:
ktongue/docker_container / simsite /venv /lib /python3.14 /site-packages /django /utils /connection.py
| from asgiref.local import Local | |
| from django.conf import settings as django_settings | |
| from django.utils.functional import cached_property | |
| class ConnectionProxy: | |
| """Proxy for accessing a connection object's attributes.""" | |
| def __init__(self, connections, alias): | |
| self.__dict__["_connections"] = connections | |
| self.__dict__["_alias"] = alias | |
| def __getattr__(self, item): | |
| return getattr(self._connections[self._alias], item) | |
| def __setattr__(self, name, value): | |
| return setattr(self._connections[self._alias], name, value) | |
| def __delattr__(self, name): | |
| return delattr(self._connections[self._alias], name) | |
| def __contains__(self, key): | |
| return key in self._connections[self._alias] | |
| def __eq__(self, other): | |
| return self._connections[self._alias] == other | |
| class ConnectionDoesNotExist(Exception): | |
| pass | |
| class BaseConnectionHandler: | |
| settings_name = None | |
| exception_class = ConnectionDoesNotExist | |
| thread_critical = False | |
| def __init__(self, settings=None): | |
| self._settings = settings | |
| self._connections = Local(self.thread_critical) | |
| def settings(self): | |
| self._settings = self.configure_settings(self._settings) | |
| return self._settings | |
| def configure_settings(self, settings): | |
| if settings is None: | |
| settings = getattr(django_settings, self.settings_name) | |
| return settings | |
| def create_connection(self, alias): | |
| raise NotImplementedError("Subclasses must implement create_connection().") | |
| def __getitem__(self, alias): | |
| try: | |
| return getattr(self._connections, alias) | |
| except AttributeError: | |
| if alias not in self.settings: | |
| raise self.exception_class(f"The connection '{alias}' doesn't exist.") | |
| conn = self.create_connection(alias) | |
| setattr(self._connections, alias, conn) | |
| return conn | |
| def __setitem__(self, key, value): | |
| setattr(self._connections, key, value) | |
| def __delitem__(self, key): | |
| delattr(self._connections, key) | |
| def __iter__(self): | |
| return iter(self.settings) | |
| def all(self, initialized_only=False): | |
| return [ | |
| self[alias] | |
| for alias in self | |
| # If initialized_only is True, return only initialized connections. | |
| if not initialized_only or hasattr(self._connections, alias) | |
| ] | |
| def close_all(self): | |
| for conn in self.all(initialized_only=True): | |
| conn.close() | |
Xet Storage Details
- Size:
- 2.55 kB
- Xet hash:
- 3e7c450b9b3eeea198eb822855440bc2fdabb2fb91fdf6c745b66b8c1ca6c499
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.