Buckets:
ktongue/docker_container / simsite /venv /lib /python3.14 /site-packages /django /db /models /base.py
| import copy | |
| import inspect | |
| import warnings | |
| from collections import defaultdict | |
| from functools import partialmethod | |
| from itertools import chain | |
| from asgiref.sync import sync_to_async | |
| import django | |
| from django.apps import apps | |
| from django.conf import settings | |
| from django.core import checks | |
| from django.core.exceptions import ( | |
| NON_FIELD_ERRORS, | |
| FieldDoesNotExist, | |
| FieldError, | |
| MultipleObjectsReturned, | |
| ObjectDoesNotExist, | |
| ObjectNotUpdated, | |
| ValidationError, | |
| ) | |
| from django.db import ( | |
| DJANGO_VERSION_PICKLE_KEY, | |
| DatabaseError, | |
| connection, | |
| connections, | |
| router, | |
| transaction, | |
| ) | |
| from django.db.models import NOT_PROVIDED, ExpressionWrapper, IntegerField, Max, Value | |
| from django.db.models.constants import LOOKUP_SEP | |
| from django.db.models.deletion import CASCADE, Collector | |
| from django.db.models.expressions import DatabaseDefault | |
| from django.db.models.fields.composite import CompositePrimaryKey | |
| from django.db.models.fields.related import ( | |
| ForeignObjectRel, | |
| OneToOneField, | |
| lazy_related_operation, | |
| resolve_relation, | |
| ) | |
| from django.db.models.functions import Coalesce | |
| from django.db.models.manager import Manager | |
| from django.db.models.options import Options | |
| from django.db.models.query import F, Q | |
| from django.db.models.signals import ( | |
| class_prepared, | |
| post_init, | |
| post_save, | |
| pre_init, | |
| pre_save, | |
| ) | |
| from django.db.models.utils import AltersData, make_model_tuple | |
| from django.utils.encoding import force_str | |
| from django.utils.hashable import make_hashable | |
| from django.utils.text import capfirst, get_text_list | |
| from django.utils.translation import gettext_lazy as _ | |
| class Deferred: | |
| def __repr__(self): | |
| return "<Deferred field>" | |
| def __str__(self): | |
| return "<Deferred field>" | |
| DEFERRED = Deferred() | |
| def subclass_exception(name, bases, module, attached_to): | |
| """ | |
| Create exception subclass. Used by ModelBase below. | |
| The exception is created in a way that allows it to be pickled, assuming | |
| that the returned exception class will be added as an attribute to the | |
| 'attached_to' class. | |
| """ | |
| return type( | |
| name, | |
| bases, | |
| { | |
| "__module__": module, | |
| "__qualname__": "%s.%s" % (attached_to.__qualname__, name), | |
| }, | |
| ) | |
| def _has_contribute_to_class(value): | |
| # Only call contribute_to_class() if it's bound. | |
| return not inspect.isclass(value) and hasattr(value, "contribute_to_class") | |
| class ModelBase(type): | |
| """Metaclass for all models.""" | |
| def __new__(cls, name, bases, attrs, **kwargs): | |
| super_new = super().__new__ | |
| # Also ensure initialization is only performed for subclasses of Model | |
| # (excluding Model class itself). | |
| parents = [b for b in bases if isinstance(b, ModelBase)] | |
| if not parents: | |
| return super_new(cls, name, bases, attrs) | |
| # Create the class. | |
| module = attrs.pop("__module__") | |
| new_attrs = {"__module__": module} | |
| classcell = attrs.pop("__classcell__", None) | |
| if classcell is not None: | |
| new_attrs["__classcell__"] = classcell | |
| attr_meta = attrs.pop("Meta", None) | |
| # Pass all attrs without a (Django-specific) contribute_to_class() | |
| # method to type.__new__() so that they're properly initialized | |
| # (i.e. __set_name__()). | |
| contributable_attrs = {} | |
| for obj_name, obj in attrs.items(): | |
| if _has_contribute_to_class(obj): | |
| contributable_attrs[obj_name] = obj | |
| else: | |
| new_attrs[obj_name] = obj | |
| new_class = super_new(cls, name, bases, new_attrs, **kwargs) | |
| abstract = getattr(attr_meta, "abstract", False) | |
| meta = attr_meta or getattr(new_class, "Meta", None) | |
| base_meta = getattr(new_class, "_meta", None) | |
| app_label = None | |
| # Look for an application configuration to attach the model to. | |
| app_config = apps.get_containing_app_config(module) | |
| if getattr(meta, "app_label", None) is None: | |
| if app_config is None: | |
| if not abstract: | |
| raise RuntimeError( | |
| "Model class %s.%s doesn't declare an explicit " | |
| "app_label and isn't in an application in " | |
| "INSTALLED_APPS." % (module, name) | |
| ) | |
| else: | |
| app_label = app_config.label | |
| new_class.add_to_class("_meta", Options(meta, app_label)) | |
| if not abstract: | |
| new_class.add_to_class( | |
| "DoesNotExist", | |
| subclass_exception( | |
| "DoesNotExist", | |
| tuple( | |
| x.DoesNotExist | |
| for x in parents | |
| if hasattr(x, "_meta") and not x._meta.abstract | |
| ) | |
| or (ObjectDoesNotExist,), | |
| module, | |
| attached_to=new_class, | |
| ), | |
| ) | |
| new_class.add_to_class( | |
| "MultipleObjectsReturned", | |
| subclass_exception( | |
| "MultipleObjectsReturned", | |
| tuple( | |
| x.MultipleObjectsReturned | |
| for x in parents | |
| if hasattr(x, "_meta") and not x._meta.abstract | |
| ) | |
| or (MultipleObjectsReturned,), | |
| module, | |
| attached_to=new_class, | |
| ), | |
| ) | |
| new_class.add_to_class( | |
| "NotUpdated", | |
| subclass_exception( | |
| "NotUpdated", | |
| tuple( | |
| x.NotUpdated | |
| for x in parents | |
| if hasattr(x, "_meta") and not x._meta.abstract | |
| ) | |
| # Subclass DatabaseError as well for backward compatibility | |
| # reasons as __subclasshook__ is not taken into account on | |
| # exception handling. | |
| or (ObjectNotUpdated, DatabaseError), | |
| module, | |
| attached_to=new_class, | |
| ), | |
| ) | |
| if base_meta and not base_meta.abstract: | |
| # Non-abstract child classes inherit some attributes from their | |
| # non-abstract parent (unless an ABC comes before it in the | |
| # method resolution order). | |
| if not hasattr(meta, "ordering"): | |
| new_class._meta.ordering = base_meta.ordering | |
| if not hasattr(meta, "get_latest_by"): | |
| new_class._meta.get_latest_by = base_meta.get_latest_by | |
| is_proxy = new_class._meta.proxy | |
| # If the model is a proxy, ensure that the base class | |
| # hasn't been swapped out. | |
| if is_proxy and base_meta and base_meta.swapped: | |
| raise TypeError( | |
| "%s cannot proxy the swapped model '%s'." % (name, base_meta.swapped) | |
| ) | |
| # Add remaining attributes (those with a contribute_to_class() method) | |
| # to the class. | |
| for obj_name, obj in contributable_attrs.items(): | |
| new_class.add_to_class(obj_name, obj) | |
| # All the fields of any type declared on this model | |
| new_fields = chain( | |
| new_class._meta.local_fields, | |
| new_class._meta.local_many_to_many, | |
| new_class._meta.private_fields, | |
| ) | |
| field_names = {f.name for f in new_fields} | |
| # Basic setup for proxy models. | |
| if is_proxy: | |
| base = None | |
| for parent in [kls for kls in parents if hasattr(kls, "_meta")]: | |
| if parent._meta.abstract: | |
| if parent._meta.fields: | |
| raise TypeError( | |
| "Abstract base class containing model fields not " | |
| "permitted for proxy model '%s'." % name | |
| ) | |
| else: | |
| continue | |
| if base is None: | |
| base = parent | |
| elif parent._meta.concrete_model is not base._meta.concrete_model: | |
| raise TypeError( | |
| "Proxy model '%s' has more than one non-abstract model base " | |
| "class." % name | |
| ) | |
| if base is None: | |
| raise TypeError( | |
| "Proxy model '%s' has no non-abstract model base class." % name | |
| ) | |
| new_class._meta.setup_proxy(base) | |
| new_class._meta.concrete_model = base._meta.concrete_model | |
| else: | |
| new_class._meta.concrete_model = new_class | |
| # Collect the parent links for multi-table inheritance. | |
| parent_links = {} | |
| for base in reversed([new_class, *parents]): | |
| # Conceptually equivalent to `if base is Model`. | |
| if not hasattr(base, "_meta"): | |
| continue | |
| # Skip concrete parent classes. | |
| if base != new_class and not base._meta.abstract: | |
| continue | |
| # Locate OneToOneField instances. | |
| for field in base._meta.local_fields: | |
| if isinstance(field, OneToOneField) and field.remote_field.parent_link: | |
| related = resolve_relation(new_class, field.remote_field.model) | |
| parent_links[make_model_tuple(related)] = field | |
| # Track fields inherited from base models. | |
| inherited_attributes = set() | |
| # Do the appropriate setup for any model parents. | |
| for base in new_class.mro(): | |
| if base not in parents or not hasattr(base, "_meta"): | |
| # Things without _meta aren't functional models, so they're | |
| # uninteresting parents. | |
| inherited_attributes.update(base.__dict__) | |
| continue | |
| parent_fields = base._meta.local_fields + base._meta.local_many_to_many | |
| if not base._meta.abstract: | |
| # Check for clashes between locally declared fields and those | |
| # on the base classes. | |
| for field in parent_fields: | |
| if field.name in field_names: | |
| raise FieldError( | |
| "Local field %r in class %r clashes with field of " | |
| "the same name from base class %r." | |
| % ( | |
| field.name, | |
| name, | |
| base.__name__, | |
| ) | |
| ) | |
| else: | |
| inherited_attributes.add(field.name) | |
| # Concrete classes... | |
| base = base._meta.concrete_model | |
| base_key = make_model_tuple(base) | |
| if base_key in parent_links: | |
| field = parent_links[base_key] | |
| elif not is_proxy: | |
| attr_name = "%s_ptr" % base._meta.model_name | |
| field = OneToOneField( | |
| base, | |
| on_delete=CASCADE, | |
| name=attr_name, | |
| auto_created=True, | |
| parent_link=True, | |
| ) | |
| if attr_name in field_names: | |
| raise FieldError( | |
| "Auto-generated field '%s' in class %r for " | |
| "parent_link to base class %r clashes with " | |
| "declared field of the same name." | |
| % ( | |
| attr_name, | |
| name, | |
| base.__name__, | |
| ) | |
| ) | |
| # Only add the ptr field if it's not already present; | |
| # e.g. migrations will already have it specified | |
| if not hasattr(new_class, attr_name): | |
| new_class.add_to_class(attr_name, field) | |
| else: | |
| field = None | |
| new_class._meta.parents[base] = field | |
| else: | |
| base_parents = base._meta.parents.copy() | |
| # Add fields from abstract base class if it wasn't overridden. | |
| for field in parent_fields: | |
| if ( | |
| field.name not in field_names | |
| and field.name not in new_class.__dict__ | |
| and field.name not in inherited_attributes | |
| ): | |
| new_field = copy.deepcopy(field) | |
| new_class.add_to_class(field.name, new_field) | |
| # Replace parent links defined on this base by the new | |
| # field. It will be appropriately resolved if required. | |
| if field.one_to_one: | |
| for parent, parent_link in base_parents.items(): | |
| if field == parent_link: | |
| base_parents[parent] = new_field | |
| # Pass any non-abstract parent classes onto child. | |
| new_class._meta.parents.update(base_parents) | |
| # Inherit private fields (like GenericForeignKey) from the parent | |
| # class if they are not overridden. | |
| for field in base._meta.private_fields: | |
| if field.name in field_names: | |
| if not base._meta.abstract: | |
| raise FieldError( | |
| "Local field %r in class %r clashes with field of " | |
| "the same name from base class %r." | |
| % ( | |
| field.name, | |
| name, | |
| base.__name__, | |
| ) | |
| ) | |
| elif ( | |
| field.name not in new_class.__dict__ | |
| and field.name not in inherited_attributes | |
| ): | |
| field = copy.deepcopy(field) | |
| if not base._meta.abstract: | |
| field.mti_inherited = True | |
| new_class.add_to_class(field.name, field) | |
| # Copy indexes so that index names are unique when models extend an | |
| # abstract model. | |
| new_class._meta.indexes = [ | |
| copy.deepcopy(idx) for idx in new_class._meta.indexes | |
| ] | |
| if abstract: | |
| # Abstract base models can't be instantiated and don't appear in | |
| # the list of models for an app. We do the final setup for them a | |
| # little differently from normal models. | |
| attr_meta.abstract = False | |
| new_class.Meta = attr_meta | |
| return new_class | |
| new_class._prepare() | |
| new_class._meta.apps.register_model(new_class._meta.app_label, new_class) | |
| return new_class | |
| def add_to_class(cls, name, value): | |
| if _has_contribute_to_class(value): | |
| value.contribute_to_class(cls, name) | |
| else: | |
| setattr(cls, name, value) | |
| def _prepare(cls): | |
| """Create some methods once self._meta has been populated.""" | |
| opts = cls._meta | |
| opts._prepare(cls) | |
| if opts.order_with_respect_to: | |
| cls.get_next_in_order = partialmethod( | |
| cls._get_next_or_previous_in_order, is_next=True | |
| ) | |
| cls.get_previous_in_order = partialmethod( | |
| cls._get_next_or_previous_in_order, is_next=False | |
| ) | |
| # Defer creating accessors on the foreign class until it has been | |
| # created and registered. If remote_field is None, we're ordering | |
| # with respect to a GenericForeignKey and don't know what the | |
| # foreign class is - we'll add those accessors later in | |
| # contribute_to_class(). | |
| if opts.order_with_respect_to.remote_field: | |
| wrt = opts.order_with_respect_to | |
| remote = wrt.remote_field.model | |
| lazy_related_operation(make_foreign_order_accessors, cls, remote) | |
| # Give the class a docstring -- its definition. | |
| if cls.__doc__ is None: | |
| cls.__doc__ = "%s(%s)" % ( | |
| cls.__name__, | |
| ", ".join(f.name for f in opts.fields), | |
| ) | |
| get_absolute_url_override = settings.ABSOLUTE_URL_OVERRIDES.get( | |
| opts.label_lower | |
| ) | |
| if get_absolute_url_override: | |
| setattr(cls, "get_absolute_url", get_absolute_url_override) | |
| if not opts.managers: | |
| if any(f.name == "objects" for f in opts.fields): | |
| raise ValueError( | |
| "Model %s must specify a custom Manager, because it has a " | |
| "field named 'objects'." % cls.__name__ | |
| ) | |
| manager = Manager() | |
| manager.auto_created = True | |
| cls.add_to_class("objects", manager) | |
| # Set the name of _meta.indexes. This can't be done in | |
| # Options.contribute_to_class() because fields haven't been added to | |
| # the model at that point. | |
| for index in cls._meta.indexes: | |
| if not index.name: | |
| index.set_name_with_model(cls) | |
| class_prepared.send(sender=cls) | |
| def _base_manager(cls): | |
| return cls._meta.base_manager | |
| def _default_manager(cls): | |
| return cls._meta.default_manager | |
| class ModelStateFieldsCacheDescriptor: | |
| def __get__(self, instance, cls=None): | |
| if instance is None: | |
| return self | |
| res = instance.fields_cache = {} | |
| return res | |
| class ModelState: | |
| """Store model instance state.""" | |
| db = None | |
| # If true, uniqueness validation checks will consider this a new, unsaved | |
| # object. Necessary for correct validation of new instances of objects with | |
| # explicit (non-auto) PKs. This impacts validation only; it has no effect | |
| # on the actual save. | |
| adding = True | |
| fields_cache = ModelStateFieldsCacheDescriptor() | |
| class Model(AltersData, metaclass=ModelBase): | |
| def __init__(self, *args, **kwargs): | |
| # Alias some things as locals to avoid repeat global lookups | |
| cls = self.__class__ | |
| opts = self._meta | |
| _setattr = setattr | |
| _DEFERRED = DEFERRED | |
| if opts.abstract: | |
| raise TypeError("Abstract models cannot be instantiated.") | |
| pre_init.send(sender=cls, args=args, kwargs=kwargs) | |
| # Set up the storage for instance state | |
| self._state = ModelState() | |
| # There is a rather weird disparity here; if kwargs, it's set, then | |
| # args overrides it. It should be one or the other; don't duplicate the | |
| # work The reason for the kwargs check is that standard iterator passes | |
| # in by args, and instantiation for iteration is 33% faster. | |
| if len(args) > len(opts.concrete_fields): | |
| # Daft, but matches old exception sans the err msg. | |
| raise IndexError("Number of args exceeds number of fields") | |
| if not kwargs: | |
| fields_iter = iter(opts.concrete_fields) | |
| # The ordering of the zip calls matter - zip throws StopIteration | |
| # when an iter throws it. So if the first iter throws it, the | |
| # second is *not* consumed. We rely on this, so don't change the | |
| # order without changing the logic. | |
| for val, field in zip(args, fields_iter): | |
| if val is _DEFERRED: | |
| continue | |
| _setattr(self, field.attname, val) | |
| else: | |
| # Slower, kwargs-ready version. | |
| fields_iter = iter(opts.fields) | |
| for val, field in zip(args, fields_iter): | |
| if val is _DEFERRED: | |
| continue | |
| _setattr(self, field.attname, val) | |
| if kwargs.pop(field.name, NOT_PROVIDED) is not NOT_PROVIDED: | |
| raise TypeError( | |
| f"{cls.__qualname__}() got both positional and " | |
| f"keyword arguments for field '{field.name}'." | |
| ) | |
| # Now we're left with the unprocessed fields that *must* come from | |
| # keywords, or default. | |
| for field in fields_iter: | |
| is_related_object = False | |
| # Virtual field | |
| if field.column is None or field.generated: | |
| continue | |
| if kwargs: | |
| if isinstance(field.remote_field, ForeignObjectRel): | |
| try: | |
| # Assume object instance was passed in. | |
| rel_obj = kwargs.pop(field.name) | |
| is_related_object = True | |
| except KeyError: | |
| try: | |
| # Object instance wasn't passed in -- must be an | |
| # ID. | |
| val = kwargs.pop(field.attname) | |
| except KeyError: | |
| val = field.get_default() | |
| else: | |
| try: | |
| val = kwargs.pop(field.attname) | |
| except KeyError: | |
| # This is done with an exception rather than the | |
| # default argument on pop because we don't want | |
| # get_default() to be evaluated, and then not used. | |
| # Refs #12057. | |
| val = field.get_default() | |
| else: | |
| val = field.get_default() | |
| if is_related_object: | |
| # If we are passed a related instance, set it using the | |
| # field.name instead of field.attname (e.g. "user" instead of | |
| # "user_id") so that the object gets properly cached (and type | |
| # checked) by the RelatedObjectDescriptor. | |
| if rel_obj is not _DEFERRED: | |
| _setattr(self, field.name, rel_obj) | |
| else: | |
| if val is not _DEFERRED: | |
| _setattr(self, field.attname, val) | |
| if kwargs: | |
| property_names = opts._property_names | |
| unexpected = () | |
| for prop, value in kwargs.items(): | |
| # Any remaining kwargs must correspond to properties or virtual | |
| # fields. | |
| if prop in property_names: | |
| if value is not _DEFERRED: | |
| _setattr(self, prop, value) | |
| else: | |
| try: | |
| opts.get_field(prop) | |
| except FieldDoesNotExist: | |
| unexpected += (prop,) | |
| else: | |
| if value is not _DEFERRED: | |
| _setattr(self, prop, value) | |
| if unexpected: | |
| unexpected_names = ", ".join(repr(n) for n in unexpected) | |
| raise TypeError( | |
| f"{cls.__name__}() got unexpected keyword arguments: " | |
| f"{unexpected_names}" | |
| ) | |
| super().__init__() | |
| post_init.send(sender=cls, instance=self) | |
| def from_db(cls, db, field_names, values): | |
| if len(values) != len(cls._meta.concrete_fields): | |
| values_iter = iter(values) | |
| values = [ | |
| next(values_iter) if f.attname in field_names else DEFERRED | |
| for f in cls._meta.concrete_fields | |
| ] | |
| new = cls(*values) | |
| new._state.adding = False | |
| new._state.db = db | |
| return new | |
| def __repr__(self): | |
| return "<%s: %s>" % (self.__class__.__name__, self) | |
| def __str__(self): | |
| return "%s object (%s)" % (self.__class__.__name__, self.pk) | |
| def __eq__(self, other): | |
| if not isinstance(other, Model): | |
| return NotImplemented | |
| if self._meta.concrete_model != other._meta.concrete_model: | |
| return False | |
| my_pk = self.pk | |
| if my_pk is None: | |
| return self is other | |
| return my_pk == other.pk | |
| def __hash__(self): | |
| if not self._is_pk_set(): | |
| raise TypeError("Model instances without primary key value are unhashable") | |
| return hash(self.pk) | |
| def __reduce__(self): | |
| data = self.__getstate__() | |
| data[DJANGO_VERSION_PICKLE_KEY] = django.__version__ | |
| class_id = self._meta.app_label, self._meta.object_name | |
| return model_unpickle, (class_id,), data | |
| def __getstate__(self): | |
| """Hook to allow choosing the attributes to pickle.""" | |
| state = self.__dict__.copy() | |
| state["_state"] = copy.copy(state["_state"]) | |
| state["_state"].fields_cache = state["_state"].fields_cache.copy() | |
| # memoryview cannot be pickled, so cast it to bytes and store | |
| # separately. | |
| _memoryview_attrs = [] | |
| for attr, value in state.items(): | |
| if isinstance(value, memoryview): | |
| _memoryview_attrs.append((attr, bytes(value))) | |
| if _memoryview_attrs: | |
| state["_memoryview_attrs"] = _memoryview_attrs | |
| for attr, value in _memoryview_attrs: | |
| state.pop(attr) | |
| return state | |
| def __setstate__(self, state): | |
| pickled_version = state.get(DJANGO_VERSION_PICKLE_KEY) | |
| if pickled_version: | |
| if pickled_version != django.__version__: | |
| warnings.warn( | |
| "Pickled model instance's Django version %s does not " | |
| "match the current version %s." | |
| % (pickled_version, django.__version__), | |
| RuntimeWarning, | |
| stacklevel=2, | |
| ) | |
| else: | |
| warnings.warn( | |
| "Pickled model instance's Django version is not specified.", | |
| RuntimeWarning, | |
| stacklevel=2, | |
| ) | |
| if "_memoryview_attrs" in state: | |
| for attr, value in state.pop("_memoryview_attrs"): | |
| state[attr] = memoryview(value) | |
| self.__dict__.update(state) | |
| def _get_pk_val(self, meta=None): | |
| meta = meta or self._meta | |
| return getattr(self, meta.pk.attname) | |
| def _set_pk_val(self, value): | |
| for parent_link in self._meta.parents.values(): | |
| if parent_link and parent_link != self._meta.pk: | |
| setattr(self, parent_link.target_field.attname, value) | |
| return setattr(self, self._meta.pk.attname, value) | |
| pk = property(_get_pk_val, _set_pk_val) | |
| def _is_pk_set(self, meta=None): | |
| pk_val = self._get_pk_val(meta) | |
| return not ( | |
| pk_val is None | |
| or (isinstance(pk_val, tuple) and any(f is None for f in pk_val)) | |
| ) | |
| def get_deferred_fields(self): | |
| """ | |
| Return a set containing names of deferred fields for this instance. | |
| """ | |
| return { | |
| f.attname | |
| for f in self._meta.concrete_fields | |
| if f.attname not in self.__dict__ | |
| } | |
| def refresh_from_db(self, using=None, fields=None, from_queryset=None): | |
| """ | |
| Reload field values from the database. | |
| By default, the reloading happens from the database this instance was | |
| loaded from, or by the read router if this instance wasn't loaded from | |
| any database. The using parameter will override the default. | |
| Fields can be used to specify which fields to reload. The fields | |
| should be an iterable of field attnames. If fields is None, then | |
| all non-deferred fields are reloaded. | |
| When accessing deferred fields of an instance, the deferred loading | |
| of the field will call this method. | |
| """ | |
| if fields is None: | |
| self._prefetched_objects_cache = {} | |
| else: | |
| prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", ()) | |
| fields = set(fields) | |
| for field in fields.copy(): | |
| if field in prefetched_objects_cache: | |
| del prefetched_objects_cache[field] | |
| fields.remove(field) | |
| if not fields: | |
| return | |
| if any(LOOKUP_SEP in f for f in fields): | |
| raise ValueError( | |
| 'Found "%s" in fields argument. Relations and transforms ' | |
| "are not allowed in fields." % LOOKUP_SEP | |
| ) | |
| if from_queryset is None: | |
| hints = {"instance": self} | |
| from_queryset = self.__class__._base_manager.db_manager(using, hints=hints) | |
| elif using is not None: | |
| from_queryset = from_queryset.using(using) | |
| db_instance_qs = from_queryset.filter(pk=self.pk) | |
| # Use provided fields, if not set then reload all non-deferred fields. | |
| deferred_fields = self.get_deferred_fields() | |
| if fields is not None: | |
| db_instance_qs = db_instance_qs.only(*fields) | |
| elif deferred_fields: | |
| db_instance_qs = db_instance_qs.only( | |
| *{ | |
| f.attname | |
| for f in self._meta.concrete_fields | |
| if f.attname not in deferred_fields | |
| } | |
| ) | |
| db_instance = db_instance_qs.get() | |
| non_loaded_fields = db_instance.get_deferred_fields() | |
| for field in self._meta.fields: | |
| if field.attname in non_loaded_fields: | |
| # This field wasn't refreshed - skip ahead. | |
| continue | |
| if field.concrete: | |
| setattr(self, field.attname, getattr(db_instance, field.attname)) | |
| # Clear or copy cached foreign keys. | |
| if field.is_relation: | |
| if field.is_cached(db_instance): | |
| field.set_cached_value(self, field.get_cached_value(db_instance)) | |
| elif field.is_cached(self): | |
| field.delete_cached_value(self) | |
| # Clear cached relations. | |
| for rel in self._meta.related_objects: | |
| if (fields is None or rel.name in fields) and rel.is_cached(self): | |
| rel.delete_cached_value(self) | |
| # Clear cached private relations. | |
| for field in self._meta.private_fields: | |
| if ( | |
| (fields is None or field.name in fields) | |
| and field.is_relation | |
| and field.is_cached(self) | |
| ): | |
| field.delete_cached_value(self) | |
| self._state.db = db_instance._state.db | |
| async def arefresh_from_db(self, using=None, fields=None, from_queryset=None): | |
| return await sync_to_async(self.refresh_from_db)( | |
| using=using, fields=fields, from_queryset=from_queryset | |
| ) | |
| def serializable_value(self, field_name): | |
| """ | |
| Return the value of the field name for this instance. If the field is | |
| a foreign key, return the id value instead of the object. If there's | |
| no Field object with this name on the model, return the model | |
| attribute's value. | |
| Used to serialize a field's value (in the serializer, or form output, | |
| for example). Normally, you would just access the attribute directly | |
| and not use this method. | |
| """ | |
| try: | |
| field = self._meta.get_field(field_name) | |
| except FieldDoesNotExist: | |
| return getattr(self, field_name) | |
| return getattr(self, field.attname) | |
| def save( | |
| self, | |
| *, | |
| force_insert=False, | |
| force_update=False, | |
| using=None, | |
| update_fields=None, | |
| ): | |
| """ | |
| Save the current instance. Override this in a subclass if you want to | |
| control the saving process. | |
| The 'force_insert' and 'force_update' parameters can be used to insist | |
| that the "save" must be an SQL insert or update (or equivalent for | |
| non-SQL backends), respectively. Normally, they should not be set. | |
| """ | |
| self._prepare_related_fields_for_save(operation_name="save") | |
| using = using or router.db_for_write(self.__class__, instance=self) | |
| if force_insert and (force_update or update_fields): | |
| raise ValueError("Cannot force both insert and updating in model saving.") | |
| deferred_non_generated_fields = { | |
| f.attname | |
| for f in self._meta.concrete_fields | |
| if f.attname not in self.__dict__ and f.generated is False | |
| } | |
| if update_fields is not None: | |
| # If update_fields is empty, skip the save. We do also check for | |
| # no-op saves later on for inheritance cases. This bailout is | |
| # still needed for skipping signal sending. | |
| if not update_fields: | |
| return | |
| update_fields = frozenset(update_fields) | |
| field_names = self._meta._non_pk_concrete_field_names | |
| not_updatable_fields = update_fields.difference(field_names) | |
| if not_updatable_fields: | |
| raise ValueError( | |
| "The following fields do not exist in this model, are m2m " | |
| "fields, primary keys, or are non-concrete fields: %s" | |
| % ", ".join(not_updatable_fields) | |
| ) | |
| # If saving to the same database, and this model is deferred, then | |
| # automatically do an "update_fields" save on the loaded fields. | |
| elif ( | |
| not force_insert | |
| and deferred_non_generated_fields | |
| and using == self._state.db | |
| and self._is_pk_set() | |
| ): | |
| field_names = set() | |
| pk_fields = self._meta.pk_fields | |
| for field in self._meta.concrete_fields: | |
| if field not in pk_fields and not hasattr(field, "through"): | |
| field_names.add(field.attname) | |
| loaded_fields = field_names.difference(deferred_non_generated_fields) | |
| if loaded_fields: | |
| update_fields = frozenset(loaded_fields) | |
| self.save_base( | |
| using=using, | |
| force_insert=force_insert, | |
| force_update=force_update, | |
| update_fields=update_fields, | |
| ) | |
| save.alters_data = True | |
| async def asave( | |
| self, | |
| *, | |
| force_insert=False, | |
| force_update=False, | |
| using=None, | |
| update_fields=None, | |
| ): | |
| return await sync_to_async(self.save)( | |
| force_insert=force_insert, | |
| force_update=force_update, | |
| using=using, | |
| update_fields=update_fields, | |
| ) | |
| asave.alters_data = True | |
| def _validate_force_insert(cls, force_insert): | |
| if force_insert is False: | |
| return () | |
| if force_insert is True: | |
| return (cls,) | |
| if not isinstance(force_insert, tuple): | |
| raise TypeError("force_insert must be a bool or tuple.") | |
| for member in force_insert: | |
| if not isinstance(member, ModelBase): | |
| raise TypeError( | |
| f"Invalid force_insert member. {member!r} must be a model subclass." | |
| ) | |
| if not issubclass(cls, member): | |
| raise TypeError( | |
| f"Invalid force_insert member. {member.__qualname__} must be a " | |
| f"base of {cls.__qualname__}." | |
| ) | |
| return force_insert | |
| def save_base( | |
| self, | |
| raw=False, | |
| force_insert=False, | |
| force_update=False, | |
| using=None, | |
| update_fields=None, | |
| ): | |
| """ | |
| Handle the parts of saving which should be done only once per save, | |
| yet need to be done in raw saves, too. This includes some sanity | |
| checks and signal sending. | |
| The 'raw' argument is telling save_base not to save any parent | |
| models and not to do any changes to the values before save. This | |
| is used by fixture loading. | |
| """ | |
| using = using or router.db_for_write(self.__class__, instance=self) | |
| assert not (force_insert and (force_update or update_fields)) | |
| assert update_fields is None or update_fields | |
| cls = origin = self.__class__ | |
| # Skip proxies, but keep the origin as the proxy model. | |
| if cls._meta.proxy: | |
| cls = cls._meta.concrete_model | |
| meta = cls._meta | |
| if not meta.auto_created: | |
| pre_save.send( | |
| sender=origin, | |
| instance=self, | |
| raw=raw, | |
| using=using, | |
| update_fields=update_fields, | |
| ) | |
| # A transaction isn't needed if one query is issued. | |
| if meta.parents: | |
| context_manager = transaction.atomic(using=using, savepoint=False) | |
| else: | |
| context_manager = transaction.mark_for_rollback_on_error(using=using) | |
| with context_manager: | |
| parent_inserted = False | |
| if not raw: | |
| # Validate force insert only when parents are inserted. | |
| force_insert = self._validate_force_insert(force_insert) | |
| parent_inserted = self._save_parents( | |
| cls, using, update_fields, force_insert | |
| ) | |
| updated = self._save_table( | |
| raw, | |
| cls, | |
| force_insert or parent_inserted, | |
| force_update, | |
| using, | |
| update_fields, | |
| ) | |
| # Store the database on which the object was saved | |
| self._state.db = using | |
| # Once saved, this is no longer a to-be-added instance. | |
| self._state.adding = False | |
| # Signal that the save is complete | |
| if not meta.auto_created: | |
| post_save.send( | |
| sender=origin, | |
| instance=self, | |
| created=(not updated), | |
| update_fields=update_fields, | |
| raw=raw, | |
| using=using, | |
| ) | |
| save_base.alters_data = True | |
| def _save_parents( | |
| self, cls, using, update_fields, force_insert, updated_parents=None | |
| ): | |
| """Save all the parents of cls using values from self.""" | |
| meta = cls._meta | |
| inserted = False | |
| if updated_parents is None: | |
| updated_parents = {} | |
| for parent, field in meta.parents.items(): | |
| # Make sure the link fields are synced between parent and self. | |
| if ( | |
| field | |
| and getattr(self, parent._meta.pk.attname) is None | |
| and getattr(self, field.attname) is not None | |
| ): | |
| setattr(self, parent._meta.pk.attname, getattr(self, field.attname)) | |
| if (parent_updated := updated_parents.get(parent)) is None: | |
| parent_inserted = self._save_parents( | |
| cls=parent, | |
| using=using, | |
| update_fields=update_fields, | |
| force_insert=force_insert, | |
| updated_parents=updated_parents, | |
| ) | |
| updated = self._save_table( | |
| cls=parent, | |
| using=using, | |
| update_fields=update_fields, | |
| force_insert=parent_inserted or issubclass(parent, force_insert), | |
| ) | |
| if not updated: | |
| inserted = True | |
| updated_parents[parent] = updated | |
| elif not parent_updated: | |
| inserted = True | |
| # Set the parent's PK value to self. | |
| if field: | |
| setattr(self, field.attname, self._get_pk_val(parent._meta)) | |
| # Since we didn't have an instance of the parent handy set | |
| # attname directly, bypassing the descriptor. Invalidate | |
| # the related object cache, in case it's been accidentally | |
| # populated. A fresh instance will be re-built from the | |
| # database if necessary. | |
| if field.is_cached(self): | |
| field.delete_cached_value(self) | |
| return inserted | |
| def _save_table( | |
| self, | |
| raw=False, | |
| cls=None, | |
| force_insert=False, | |
| force_update=False, | |
| using=None, | |
| update_fields=None, | |
| ): | |
| """ | |
| Do the heavy-lifting involved in saving. Update or insert the data | |
| for a single table. | |
| """ | |
| meta = cls._meta | |
| pk_fields = meta.pk_fields | |
| non_pks_non_generated = [ | |
| f | |
| for f in meta.local_concrete_fields | |
| if f not in pk_fields and not f.generated | |
| ] | |
| if update_fields: | |
| non_pks_non_generated = [ | |
| f | |
| for f in non_pks_non_generated | |
| if f.name in update_fields or f.attname in update_fields | |
| ] | |
| if not self._is_pk_set(meta): | |
| pk_val = meta.pk.get_pk_value_on_save(self) | |
| setattr(self, meta.pk.attname, pk_val) | |
| pk_set = self._is_pk_set(meta) | |
| if not pk_set and (force_update or update_fields): | |
| raise ValueError("Cannot force an update in save() with no primary key.") | |
| updated = False | |
| # Skip an UPDATE when adding an instance and primary key has a default. | |
| if ( | |
| not raw | |
| and not force_insert | |
| and not force_update | |
| and self._state.adding | |
| and all(f.has_default() or f.has_db_default() for f in meta.pk_fields) | |
| ): | |
| force_insert = True | |
| # If possible, try an UPDATE. If that doesn't update anything, do an | |
| # INSERT. | |
| if pk_set and not force_insert: | |
| base_qs = cls._base_manager.using(using) | |
| values = [ | |
| ( | |
| f, | |
| None, | |
| (getattr(self, f.attname) if raw else f.pre_save(self, False)), | |
| ) | |
| for f in non_pks_non_generated | |
| ] | |
| forced_update = update_fields or force_update | |
| pk_val = self._get_pk_val(meta) | |
| returning_fields = [ | |
| f | |
| for f in meta.local_concrete_fields | |
| if ( | |
| f.generated | |
| and f.referenced_fields.intersection(non_pks_non_generated) | |
| ) | |
| ] | |
| for field, _model, value in values: | |
| if (update_fields is None or field.name in update_fields) and hasattr( | |
| value, "resolve_expression" | |
| ): | |
| returning_fields.append(field) | |
| results = self._do_update( | |
| base_qs, | |
| using, | |
| pk_val, | |
| values, | |
| update_fields, | |
| forced_update, | |
| returning_fields, | |
| ) | |
| if updated := bool(results): | |
| self._assign_returned_values(results[0], returning_fields) | |
| elif force_update: | |
| raise self.NotUpdated("Forced update did not affect any rows.") | |
| elif update_fields: | |
| raise self.NotUpdated( | |
| "Save with update_fields did not affect any rows." | |
| ) | |
| if not updated: | |
| if meta.order_with_respect_to: | |
| # If this is a model with an order_with_respect_to | |
| # autopopulate the _order field | |
| field = meta.order_with_respect_to | |
| filter_args = field.get_filter_kwargs_for_object(self) | |
| self._order = ( | |
| cls._base_manager.using(using) | |
| .filter(**filter_args) | |
| .aggregate( | |
| _order__max=Coalesce( | |
| ExpressionWrapper( | |
| Max("_order") + Value(1), output_field=IntegerField() | |
| ), | |
| Value(0), | |
| ), | |
| )["_order__max"] | |
| ) | |
| insert_fields = [ | |
| f | |
| for f in meta.local_concrete_fields | |
| if not f.generated and (pk_set or f is not meta.auto_field) | |
| ] | |
| returning_fields = list(meta.db_returning_fields) | |
| can_return_columns_from_insert = connections[ | |
| using | |
| ].features.can_return_columns_from_insert | |
| for field in insert_fields: | |
| value = ( | |
| getattr(self, field.attname) | |
| if raw | |
| else field.pre_save(self, add=True) | |
| ) | |
| if hasattr(value, "resolve_expression"): | |
| if field not in returning_fields: | |
| returning_fields.append(field) | |
| elif ( | |
| field.db_returning | |
| and not can_return_columns_from_insert | |
| and not (pk_set and field is meta.auto_field) | |
| ): | |
| returning_fields.remove(field) | |
| results = self._do_insert( | |
| cls._base_manager, using, insert_fields, returning_fields, raw | |
| ) | |
| if results: | |
| self._assign_returned_values(results[0], returning_fields) | |
| return updated | |
| def _do_update( | |
| self, | |
| base_qs, | |
| using, | |
| pk_val, | |
| values, | |
| update_fields, | |
| forced_update, | |
| returning_fields, | |
| ): | |
| """ | |
| Try to update the model. Return True if the model was updated (if an | |
| update query was done and a matching row was found in the DB). | |
| """ | |
| filtered = base_qs.filter(pk=pk_val) | |
| if not values: | |
| # We can end up here when saving a model in inheritance chain where | |
| # update_fields doesn't target any field in current model. In that | |
| # case we just say the update succeeded. Another case ending up | |
| # here is a model with just PK - in that case check that the PK | |
| # still exists. | |
| if update_fields is not None or filtered.exists(): | |
| return [()] | |
| return [] | |
| if self._meta.select_on_save and not forced_update: | |
| # It may happen that the object is deleted from the DB right after | |
| # this check, causing the subsequent UPDATE to return zero matching | |
| # rows. The same result can occur in some rare cases when the | |
| # database returns zero despite the UPDATE being executed | |
| # successfully (a row is matched and updated). In order to | |
| # distinguish these two cases, the object's existence in the | |
| # database is again checked for if the UPDATE query returns 0. | |
| if not filtered.exists(): | |
| return [] | |
| if results := filtered._update(values, returning_fields): | |
| return results | |
| return [()] if filtered.exists() else [] | |
| return filtered._update(values, returning_fields) | |
| def _do_insert(self, manager, using, fields, returning_fields, raw): | |
| """ | |
| Do an INSERT. If returning_fields is defined then this method should | |
| return the newly created data for the model. | |
| """ | |
| return manager._insert( | |
| [self], | |
| fields=fields, | |
| returning_fields=returning_fields, | |
| using=using, | |
| raw=raw, | |
| ) | |
| def _assign_returned_values(self, returned_values, returning_fields): | |
| returning_fields_iter = iter(returning_fields) | |
| for value, field in zip(returned_values, returning_fields_iter): | |
| setattr(self, field.attname, value) | |
| # Defer all fields that were meant to be updated with their database | |
| # resolved values but couldn't as they are effectively stale. | |
| for field in returning_fields_iter: | |
| self.__dict__.pop(field.attname, None) | |
| def _prepare_related_fields_for_save(self, operation_name, fields=None): | |
| # Ensure that a model instance without a PK hasn't been assigned to | |
| # a ForeignKey, GenericForeignKey or OneToOneField on this model. If | |
| # the field is nullable, allowing the save would result in silent data | |
| # loss. | |
| for field in self._meta.concrete_fields: | |
| if fields and field not in fields: | |
| continue | |
| # If the related field isn't cached, then an instance hasn't been | |
| # assigned and there's no need to worry about this check. | |
| if field.is_relation and field.is_cached(self): | |
| obj = getattr(self, field.name, None) | |
| if not obj: | |
| continue | |
| # A pk may have been assigned manually to a model instance not | |
| # saved to the database (or auto-generated in a case like | |
| # UUIDField), but we allow the save to proceed and rely on the | |
| # database to raise an IntegrityError if applicable. If | |
| # constraints aren't supported by the database, there's the | |
| # unavoidable risk of data corruption. | |
| if not obj._is_pk_set(): | |
| # Remove the object from a related instance cache. | |
| if not field.remote_field.multiple: | |
| field.remote_field.delete_cached_value(obj) | |
| raise ValueError( | |
| "%s() prohibited to prevent data loss due to unsaved " | |
| "related object '%s'." % (operation_name, field.name) | |
| ) | |
| elif getattr(self, field.attname) in field.empty_values: | |
| # Set related object if it has been saved after an | |
| # assignment. | |
| setattr(self, field.name, obj) | |
| # If the relationship's pk/to_field was changed, clear the | |
| # cached relationship. | |
| if getattr(obj, field.target_field.attname) != getattr( | |
| self, field.attname | |
| ): | |
| field.delete_cached_value(self) | |
| # GenericForeignKeys are private. | |
| for field in self._meta.private_fields: | |
| if fields and field not in fields: | |
| continue | |
| if ( | |
| field.is_relation | |
| and field.is_cached(self) | |
| and hasattr(field, "fk_field") | |
| ): | |
| obj = field.get_cached_value(self, default=None) | |
| if obj and not obj._is_pk_set(): | |
| raise ValueError( | |
| f"{operation_name}() prohibited to prevent data loss due to " | |
| f"unsaved related object '{field.name}'." | |
| ) | |
| def delete(self, using=None, keep_parents=False): | |
| if not self._is_pk_set(): | |
| raise ValueError( | |
| "%s object can't be deleted because its %s attribute is set " | |
| "to None." % (self._meta.object_name, self._meta.pk.attname) | |
| ) | |
| using = using or router.db_for_write(self.__class__, instance=self) | |
| collector = Collector(using=using, origin=self) | |
| collector.collect([self], keep_parents=keep_parents) | |
| return collector.delete() | |
| delete.alters_data = True | |
| async def adelete(self, using=None, keep_parents=False): | |
| return await sync_to_async(self.delete)( | |
| using=using, | |
| keep_parents=keep_parents, | |
| ) | |
| adelete.alters_data = True | |
| def _get_FIELD_display(self, field): | |
| value = getattr(self, field.attname) | |
| choices_dict = dict(make_hashable(field.flatchoices)) | |
| # force_str() to coerce lazy strings. | |
| return force_str( | |
| choices_dict.get(make_hashable(value), value), strings_only=True | |
| ) | |
| def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs): | |
| if not self._is_pk_set(): | |
| raise ValueError("get_next/get_previous cannot be used on unsaved objects.") | |
| op = "gt" if is_next else "lt" | |
| order = "" if is_next else "-" | |
| param = getattr(self, field.attname) | |
| q = Q.create([(field.name, param), (f"pk__{op}", self.pk)], connector=Q.AND) | |
| q = Q.create([q, (f"{field.name}__{op}", param)], connector=Q.OR) | |
| qs = ( | |
| self.__class__._default_manager.using(self._state.db) | |
| .filter(**kwargs) | |
| .filter(q) | |
| .order_by("%s%s" % (order, field.name), "%spk" % order) | |
| ) | |
| try: | |
| return qs[0] | |
| except IndexError: | |
| raise self.DoesNotExist( | |
| "%s matching query does not exist." % self.__class__._meta.object_name | |
| ) | |
| def _get_next_or_previous_in_order(self, is_next): | |
| cachename = "__%s_order_cache" % is_next | |
| if not hasattr(self, cachename): | |
| op = "gt" if is_next else "lt" | |
| order = "_order" if is_next else "-_order" | |
| order_field = self._meta.order_with_respect_to | |
| filter_args = order_field.get_filter_kwargs_for_object(self) | |
| obj = ( | |
| self.__class__._default_manager.filter(**filter_args) | |
| .filter( | |
| **{ | |
| "_order__%s" | |
| % op: self.__class__._default_manager.values("_order").filter( | |
| **{self._meta.pk.name: self.pk} | |
| ) | |
| } | |
| ) | |
| .order_by(order)[:1] | |
| .get() | |
| ) | |
| setattr(self, cachename, obj) | |
| return getattr(self, cachename) | |
| def _get_field_expression_map(self, meta, exclude=None): | |
| if exclude is None: | |
| exclude = set() | |
| meta = meta or self._meta | |
| field_map = {} | |
| generated_fields = [] | |
| for field in meta.local_fields: | |
| if field.name in exclude: | |
| continue | |
| if field.generated: | |
| if any( | |
| ref[0] in exclude | |
| for ref in self._get_expr_references(field.expression) | |
| ): | |
| continue | |
| generated_fields.append(field) | |
| continue | |
| if ( | |
| isinstance(field.remote_field, ForeignObjectRel) | |
| and field not in meta.local_concrete_fields | |
| ): | |
| value = tuple( | |
| getattr(self, from_field) for from_field in field.from_fields | |
| ) | |
| if len(value) == 1: | |
| value = value[0] | |
| elif field.concrete: | |
| value = getattr(self, field.attname) | |
| else: | |
| continue | |
| if not value or not hasattr(value, "resolve_expression"): | |
| value = Value(value, field) | |
| field_map[field.name] = value | |
| field_map[field.attname] = value | |
| if "pk" not in exclude: | |
| field_map["pk"] = Value(self.pk, meta.pk) | |
| if generated_fields: | |
| replacements = {F(name): value for name, value in field_map.items()} | |
| for generated_field in generated_fields: | |
| field_map[generated_field.name] = ExpressionWrapper( | |
| generated_field.expression.replace_expressions(replacements), | |
| generated_field.output_field, | |
| ) | |
| return field_map | |
| def prepare_database_save(self, field): | |
| if not self._is_pk_set(): | |
| raise ValueError( | |
| "Unsaved model instance %r cannot be used in an ORM query." % self | |
| ) | |
| return getattr(self, field.remote_field.get_related_field().attname) | |
| def clean(self): | |
| """ | |
| Hook for doing any extra model-wide validation after clean() has been | |
| called on every field by self.clean_fields. Any ValidationError raised | |
| by this method will not be associated with a particular field; it will | |
| have a special-case association with the field defined by | |
| NON_FIELD_ERRORS. | |
| """ | |
| pass | |
| def validate_unique(self, exclude=None): | |
| """ | |
| Check unique constraints on the model and raise ValidationError if any | |
| failed. | |
| """ | |
| unique_checks, date_checks = self._get_unique_checks(exclude=exclude) | |
| errors = self._perform_unique_checks(unique_checks) | |
| date_errors = self._perform_date_checks(date_checks) | |
| for k, v in date_errors.items(): | |
| errors.setdefault(k, []).extend(v) | |
| if errors: | |
| raise ValidationError(errors) | |
| def _get_unique_checks(self, exclude=None, include_meta_constraints=False): | |
| """ | |
| Return a list of checks to perform. Since validate_unique() could be | |
| called from a ModelForm, some fields may have been excluded; we can't | |
| perform a unique check on a model that is missing fields involved | |
| in that check. Fields that did not validate should also be excluded, | |
| but they need to be passed in via the exclude argument. | |
| """ | |
| if exclude is None: | |
| exclude = set() | |
| unique_checks = [] | |
| unique_togethers = [(self.__class__, self._meta.unique_together)] | |
| constraints = [] | |
| if include_meta_constraints: | |
| constraints = [(self.__class__, self._meta.total_unique_constraints)] | |
| for parent_class in self._meta.all_parents: | |
| if parent_class._meta.unique_together: | |
| unique_togethers.append( | |
| (parent_class, parent_class._meta.unique_together) | |
| ) | |
| if include_meta_constraints and parent_class._meta.total_unique_constraints: | |
| constraints.append( | |
| (parent_class, parent_class._meta.total_unique_constraints) | |
| ) | |
| for model_class, unique_together in unique_togethers: | |
| for check in unique_together: | |
| if not any(name in exclude for name in check): | |
| # Add the check if the field isn't excluded. | |
| unique_checks.append((model_class, tuple(check))) | |
| if include_meta_constraints: | |
| for model_class, model_constraints in constraints: | |
| for constraint in model_constraints: | |
| if not any(name in exclude for name in constraint.fields): | |
| unique_checks.append((model_class, constraint.fields)) | |
| # These are checks for the unique_for_<date/year/month>. | |
| date_checks = [] | |
| # Gather a list of checks for fields declared as unique and add them to | |
| # the list of checks. | |
| fields_with_class = [(self.__class__, self._meta.local_fields)] | |
| for parent_class in self._meta.all_parents: | |
| fields_with_class.append((parent_class, parent_class._meta.local_fields)) | |
| for model_class, fields in fields_with_class: | |
| for f in fields: | |
| name = f.name | |
| if name in exclude: | |
| continue | |
| if isinstance(f, CompositePrimaryKey): | |
| names = tuple(field.name for field in f.fields) | |
| if exclude.isdisjoint(names): | |
| unique_checks.append((model_class, names)) | |
| continue | |
| if f.unique: | |
| unique_checks.append((model_class, (name,))) | |
| if f.unique_for_date and f.unique_for_date not in exclude: | |
| date_checks.append((model_class, "date", name, f.unique_for_date)) | |
| if f.unique_for_year and f.unique_for_year not in exclude: | |
| date_checks.append((model_class, "year", name, f.unique_for_year)) | |
| if f.unique_for_month and f.unique_for_month not in exclude: | |
| date_checks.append((model_class, "month", name, f.unique_for_month)) | |
| return unique_checks, date_checks | |
| def _perform_unique_checks(self, unique_checks): | |
| errors = {} | |
| for model_class, unique_check in unique_checks: | |
| # Try to look up an existing object with the same values as this | |
| # object's values for all the unique field. | |
| lookup_kwargs = {} | |
| for field_name in unique_check: | |
| f = self._meta.get_field(field_name) | |
| lookup_value = getattr(self, f.attname) | |
| # TODO: Handle multiple backends with different feature flags. | |
| if lookup_value is None or ( | |
| lookup_value == "" | |
| and connection.features.interprets_empty_strings_as_nulls | |
| ): | |
| # no value, skip the lookup | |
| continue | |
| if f in model_class._meta.pk_fields and not self._state.adding: | |
| # no need to check for unique primary key when editing | |
| continue | |
| lookup_kwargs[str(field_name)] = lookup_value | |
| # some fields were skipped, no reason to do the check | |
| if len(unique_check) != len(lookup_kwargs): | |
| continue | |
| qs = model_class._default_manager.filter(**lookup_kwargs) | |
| # Exclude the current object from the query if we are editing an | |
| # instance (as opposed to creating a new one) | |
| # Note that we need to use the pk as defined by model_class, not | |
| # self.pk. These can be different fields because model inheritance | |
| # allows single model to have effectively multiple primary keys. | |
| # Refs #17615. | |
| model_class_pk = self._get_pk_val(model_class._meta) | |
| if not self._state.adding and self._is_pk_set(model_class._meta): | |
| qs = qs.exclude(pk=model_class_pk) | |
| if qs.exists(): | |
| if len(unique_check) == 1: | |
| key = unique_check[0] | |
| else: | |
| key = NON_FIELD_ERRORS | |
| errors.setdefault(key, []).append( | |
| self.unique_error_message(model_class, unique_check) | |
| ) | |
| return errors | |
| def _perform_date_checks(self, date_checks): | |
| errors = {} | |
| for model_class, lookup_type, field, unique_for in date_checks: | |
| lookup_kwargs = {} | |
| # there's a ticket to add a date lookup, we can remove this special | |
| # case if that makes it's way in | |
| date = getattr(self, unique_for) | |
| if date is None: | |
| continue | |
| if lookup_type == "date": | |
| lookup_kwargs["%s__day" % unique_for] = date.day | |
| lookup_kwargs["%s__month" % unique_for] = date.month | |
| lookup_kwargs["%s__year" % unique_for] = date.year | |
| else: | |
| lookup_kwargs["%s__%s" % (unique_for, lookup_type)] = getattr( | |
| date, lookup_type | |
| ) | |
| lookup_kwargs[field] = getattr(self, field) | |
| qs = model_class._default_manager.filter(**lookup_kwargs) | |
| # Exclude the current object from the query if we are editing an | |
| # instance (as opposed to creating a new one) | |
| if not self._state.adding and self._is_pk_set(): | |
| qs = qs.exclude(pk=self.pk) | |
| if qs.exists(): | |
| errors.setdefault(field, []).append( | |
| self.date_error_message(lookup_type, field, unique_for) | |
| ) | |
| return errors | |
| def date_error_message(self, lookup_type, field_name, unique_for): | |
| opts = self._meta | |
| field = opts.get_field(field_name) | |
| return ValidationError( | |
| message=field.error_messages["unique_for_date"], | |
| code="unique_for_date", | |
| params={ | |
| "model": self, | |
| "model_name": capfirst(opts.verbose_name), | |
| "lookup_type": lookup_type, | |
| "field": field_name, | |
| "field_label": capfirst(field.verbose_name), | |
| "date_field": unique_for, | |
| "date_field_label": capfirst(opts.get_field(unique_for).verbose_name), | |
| }, | |
| ) | |
| def unique_error_message(self, model_class, unique_check): | |
| opts = model_class._meta | |
| params = { | |
| "model": self, | |
| "model_class": model_class, | |
| "model_name": capfirst(opts.verbose_name), | |
| "unique_check": unique_check, | |
| } | |
| # A unique field | |
| if len(unique_check) == 1: | |
| field = opts.get_field(unique_check[0]) | |
| params["field_label"] = capfirst(field.verbose_name) | |
| return ValidationError( | |
| message=field.error_messages["unique"], | |
| code="unique", | |
| params=params, | |
| ) | |
| # unique_together | |
| else: | |
| field_labels = [ | |
| capfirst(opts.get_field(f).verbose_name) for f in unique_check | |
| ] | |
| params["field_labels"] = get_text_list(field_labels, _("and")) | |
| return ValidationError( | |
| message=_("%(model_name)s with this %(field_labels)s already exists."), | |
| code="unique_together", | |
| params=params, | |
| ) | |
| def get_constraints(self): | |
| constraints = [(self.__class__, self._meta.constraints)] | |
| for parent_class in self._meta.all_parents: | |
| if parent_class._meta.constraints: | |
| constraints.append((parent_class, parent_class._meta.constraints)) | |
| return constraints | |
| def validate_constraints(self, exclude=None): | |
| constraints = self.get_constraints() | |
| using = router.db_for_write(self.__class__, instance=self) | |
| errors = {} | |
| for model_class, model_constraints in constraints: | |
| for constraint in model_constraints: | |
| try: | |
| constraint.validate(model_class, self, exclude=exclude, using=using) | |
| except ValidationError as e: | |
| if ( | |
| getattr(e, "code", None) == "unique" | |
| and len(constraint.fields) == 1 | |
| ): | |
| errors.setdefault(constraint.fields[0], []).append(e) | |
| else: | |
| errors = e.update_error_dict(errors) | |
| if errors: | |
| raise ValidationError(errors) | |
| def full_clean(self, exclude=None, validate_unique=True, validate_constraints=True): | |
| """ | |
| Call clean_fields(), clean(), validate_unique(), and | |
| validate_constraints() on the model. Raise a ValidationError for any | |
| errors that occur. | |
| """ | |
| errors = {} | |
| if exclude is None: | |
| exclude = set() | |
| else: | |
| exclude = set(exclude) | |
| try: | |
| self.clean_fields(exclude=exclude) | |
| except ValidationError as e: | |
| errors = e.update_error_dict(errors) | |
| # Form.clean() is run even if other validation fails, so do the | |
| # same with Model.clean() for consistency. | |
| try: | |
| self.clean() | |
| except ValidationError as e: | |
| errors = e.update_error_dict(errors) | |
| # Run unique checks, but only for fields that passed validation. | |
| if validate_unique: | |
| for name in errors: | |
| if name != NON_FIELD_ERRORS and name not in exclude: | |
| exclude.add(name) | |
| try: | |
| self.validate_unique(exclude=exclude) | |
| except ValidationError as e: | |
| errors = e.update_error_dict(errors) | |
| # Run constraints checks, but only for fields that passed validation. | |
| if validate_constraints: | |
| for name in errors: | |
| if name != NON_FIELD_ERRORS and name not in exclude: | |
| exclude.add(name) | |
| try: | |
| self.validate_constraints(exclude=exclude) | |
| except ValidationError as e: | |
| errors = e.update_error_dict(errors) | |
| if errors: | |
| raise ValidationError(errors) | |
| def clean_fields(self, exclude=None): | |
| """ | |
| Clean all fields and raise a ValidationError containing a dict | |
| of all validation errors if any occur. | |
| """ | |
| if exclude is None: | |
| exclude = set() | |
| errors = {} | |
| for f in self._meta.fields: | |
| if f.name in exclude or f.generated: | |
| continue | |
| # Skip validation for empty fields with blank=True. The developer | |
| # is responsible for making sure they have a valid value. | |
| raw_value = getattr(self, f.attname) | |
| if f.blank and raw_value in f.empty_values: | |
| continue | |
| # Skip validation for empty fields when db_default is used. | |
| if isinstance(raw_value, DatabaseDefault): | |
| continue | |
| try: | |
| setattr(self, f.attname, f.clean(raw_value, self)) | |
| except ValidationError as e: | |
| errors[f.name] = e.error_list | |
| if errors: | |
| raise ValidationError(errors) | |
| def check(cls, **kwargs): | |
| errors = [ | |
| *cls._check_swappable(), | |
| *cls._check_model(), | |
| *cls._check_managers(**kwargs), | |
| ] | |
| if not cls._meta.swapped: | |
| databases = kwargs.get("databases") or [] | |
| errors += [ | |
| *cls._check_fields(**kwargs), | |
| *cls._check_m2m_through_same_relationship(), | |
| *cls._check_long_column_names(databases), | |
| ] | |
| clash_errors = ( | |
| *cls._check_id_field(), | |
| *cls._check_field_name_clashes(), | |
| *cls._check_model_name_db_lookup_clashes(), | |
| *cls._check_property_name_related_field_accessor_clashes(), | |
| *cls._check_single_primary_key(), | |
| ) | |
| errors.extend(clash_errors) | |
| # If there are field name clashes, hide consequent column name | |
| # clashes. | |
| if not clash_errors: | |
| errors.extend(cls._check_column_name_clashes()) | |
| errors += [ | |
| *cls._check_unique_together(), | |
| *cls._check_indexes(databases), | |
| *cls._check_ordering(), | |
| *cls._check_constraints(databases), | |
| *cls._check_db_table_comment(databases), | |
| *cls._check_composite_pk(), | |
| ] | |
| return errors | |
| def _check_composite_pk(cls): | |
| errors = [] | |
| meta = cls._meta | |
| pk = meta.pk | |
| if meta.proxy or not isinstance(pk, CompositePrimaryKey): | |
| return errors | |
| seen_columns = defaultdict(list) | |
| for field_name in pk.field_names: | |
| hint = None | |
| try: | |
| field = meta.get_field(field_name) | |
| except FieldDoesNotExist: | |
| field = None | |
| if not field: | |
| hint = f"{field_name!r} is not a valid field." | |
| elif not field.column: | |
| hint = f"{field_name!r} field has no column." | |
| elif field.null: | |
| hint = f"{field_name!r} field may not set 'null=True'." | |
| elif field.generated: | |
| hint = f"{field_name!r} field is a generated field." | |
| elif field not in meta.local_fields: | |
| hint = f"{field_name!r} field is not a local field." | |
| else: | |
| seen_columns[field.column].append(field_name) | |
| if hint: | |
| errors.append( | |
| checks.Error( | |
| f"{field_name!r} cannot be included in the composite primary " | |
| "key.", | |
| hint=hint, | |
| obj=cls, | |
| id="models.E042", | |
| ) | |
| ) | |
| for column, field_names in seen_columns.items(): | |
| if len(field_names) > 1: | |
| field_name, *rest = field_names | |
| duplicates = ", ".join(repr(field) for field in rest) | |
| errors.append( | |
| checks.Error( | |
| f"{duplicates} cannot be included in the composite primary " | |
| "key.", | |
| hint=f"{duplicates} and {field_name!r} are the same fields.", | |
| obj=cls, | |
| id="models.E042", | |
| ) | |
| ) | |
| return errors | |
| def _check_db_table_comment(cls, databases): | |
| if not cls._meta.db_table_comment: | |
| return [] | |
| errors = [] | |
| for db in databases: | |
| if not router.allow_migrate_model(db, cls): | |
| continue | |
| connection = connections[db] | |
| if not ( | |
| connection.features.supports_comments | |
| or "supports_comments" in cls._meta.required_db_features | |
| ): | |
| errors.append( | |
| checks.Warning( | |
| f"{connection.display_name} does not support comments on " | |
| f"tables (db_table_comment).", | |
| obj=cls, | |
| id="models.W046", | |
| ) | |
| ) | |
| return errors | |
| def _check_swappable(cls): | |
| """Check if the swapped model exists.""" | |
| errors = [] | |
| if cls._meta.swapped: | |
| try: | |
| apps.get_model(cls._meta.swapped) | |
| except ValueError: | |
| errors.append( | |
| checks.Error( | |
| "'%s' is not of the form 'app_label.app_name'." | |
| % cls._meta.swappable, | |
| id="models.E001", | |
| ) | |
| ) | |
| except LookupError: | |
| app_label, model_name = cls._meta.swapped.split(".") | |
| errors.append( | |
| checks.Error( | |
| "'%s' references '%s.%s', which has not been " | |
| "installed, or is abstract." | |
| % (cls._meta.swappable, app_label, model_name), | |
| id="models.E002", | |
| ) | |
| ) | |
| return errors | |
| def _check_model(cls): | |
| errors = [] | |
| if cls._meta.proxy: | |
| if cls._meta.local_fields or cls._meta.local_many_to_many: | |
| errors.append( | |
| checks.Error( | |
| "Proxy model '%s' contains model fields." % cls.__name__, | |
| id="models.E017", | |
| ) | |
| ) | |
| return errors | |
| def _check_managers(cls, **kwargs): | |
| """Perform all manager checks.""" | |
| errors = [] | |
| for manager in cls._meta.managers: | |
| errors.extend(manager.check(**kwargs)) | |
| return errors | |
| def _check_fields(cls, **kwargs): | |
| """Perform all field checks.""" | |
| errors = [] | |
| for field in cls._meta.local_fields: | |
| errors.extend(field.check(**kwargs)) | |
| for field in cls._meta.local_many_to_many: | |
| errors.extend(field.check(from_model=cls, **kwargs)) | |
| return errors | |
| def _check_m2m_through_same_relationship(cls): | |
| """ | |
| Check if no relationship model is used by more than one m2m field. | |
| """ | |
| errors = [] | |
| seen_intermediary_signatures = [] | |
| fields = cls._meta.local_many_to_many | |
| # Skip when the target model wasn't found. | |
| fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase)) | |
| # Skip when the relationship model wasn't found. | |
| fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase)) | |
| for f in fields: | |
| signature = ( | |
| f.remote_field.model, | |
| cls, | |
| f.remote_field.through, | |
| f.remote_field.through_fields, | |
| ) | |
| if signature in seen_intermediary_signatures: | |
| errors.append( | |
| checks.Error( | |
| "The model has two identical many-to-many relations " | |
| "through the intermediate model '%s'." | |
| % f.remote_field.through._meta.label, | |
| obj=cls, | |
| id="models.E003", | |
| ) | |
| ) | |
| else: | |
| seen_intermediary_signatures.append(signature) | |
| return errors | |
| def _check_id_field(cls): | |
| """Check if `id` field is a primary key.""" | |
| fields = [ | |
| f for f in cls._meta.local_fields if f.name == "id" and f != cls._meta.pk | |
| ] | |
| # fields is empty or consists of the invalid "id" field | |
| if fields and not fields[0].primary_key and cls._meta.pk.name == "id": | |
| return [ | |
| checks.Error( | |
| "'id' can only be used as a field name if the field also " | |
| "sets 'primary_key=True'.", | |
| obj=cls, | |
| id="models.E004", | |
| ) | |
| ] | |
| else: | |
| return [] | |
| def _check_field_name_clashes(cls): | |
| """Forbid field shadowing in multi-table inheritance.""" | |
| errors = [] | |
| used_fields = {} # name or attname -> field | |
| # Check that multi-inheritance doesn't cause field name shadowing. | |
| for parent in cls._meta.all_parents: | |
| for f in parent._meta.local_fields: | |
| clash = used_fields.get(f.name) or used_fields.get(f.attname) or None | |
| if clash: | |
| errors.append( | |
| checks.Error( | |
| "The field '%s' from parent model " | |
| "'%s' clashes with the field '%s' " | |
| "from parent model '%s'." | |
| % (clash.name, clash.model._meta, f.name, f.model._meta), | |
| obj=cls, | |
| id="models.E005", | |
| ) | |
| ) | |
| used_fields[f.name] = f | |
| used_fields[f.attname] = f | |
| # Check that fields defined in the model don't clash with fields from | |
| # parents, including auto-generated fields like multi-table inheritance | |
| # child accessors. | |
| for parent in cls._meta.all_parents: | |
| for f in parent._meta.get_fields(): | |
| if f not in used_fields: | |
| used_fields[f.name] = f | |
| # Check that parent links in diamond-shaped MTI models don't clash. | |
| for parent_link in cls._meta.parents.values(): | |
| if not parent_link: | |
| continue | |
| clash = used_fields.get(parent_link.name) or None | |
| if clash: | |
| errors.append( | |
| checks.Error( | |
| f"The field '{parent_link.name}' clashes with the field " | |
| f"'{clash.name}' from model '{clash.model._meta}'.", | |
| obj=cls, | |
| id="models.E006", | |
| ) | |
| ) | |
| for f in cls._meta.local_fields: | |
| clash = used_fields.get(f.name) or used_fields.get(f.attname) or None | |
| # Note that we may detect clash between user-defined non-unique | |
| # field "id" and automatically added unique field "id", both | |
| # defined at the same model. This special case is considered in | |
| # _check_id_field and here we ignore it. | |
| id_conflict = ( | |
| f.name == "id" and clash and clash.name == "id" and clash.model == cls | |
| ) | |
| if clash and not id_conflict: | |
| errors.append( | |
| checks.Error( | |
| "The field '%s' clashes with the field '%s' " | |
| "from model '%s'." % (f.name, clash.name, clash.model._meta), | |
| obj=f, | |
| id="models.E006", | |
| ) | |
| ) | |
| used_fields[f.name] = f | |
| used_fields[f.attname] = f | |
| return errors | |
| def _check_column_name_clashes(cls): | |
| # Store a list of column names which have already been used by other | |
| # fields. | |
| used_column_names = [] | |
| errors = [] | |
| for f in cls._meta.local_fields: | |
| column_name = f.column | |
| # Ensure the column name is not already in use. | |
| if column_name and column_name in used_column_names: | |
| errors.append( | |
| checks.Error( | |
| "Field '%s' has column name '%s' that is used by " | |
| "another field." % (f.name, column_name), | |
| hint="Specify a 'db_column' for the field.", | |
| obj=cls, | |
| id="models.E007", | |
| ) | |
| ) | |
| else: | |
| used_column_names.append(column_name) | |
| return errors | |
| def _check_model_name_db_lookup_clashes(cls): | |
| errors = [] | |
| model_name = cls.__name__ | |
| if model_name.startswith("_") or model_name.endswith("_"): | |
| errors.append( | |
| checks.Error( | |
| "The model name '%s' cannot start or end with an underscore " | |
| "as it collides with the query lookup syntax." % model_name, | |
| obj=cls, | |
| id="models.E023", | |
| ) | |
| ) | |
| elif LOOKUP_SEP in model_name: | |
| errors.append( | |
| checks.Error( | |
| "The model name '%s' cannot contain double underscores as " | |
| "it collides with the query lookup syntax." % model_name, | |
| obj=cls, | |
| id="models.E024", | |
| ) | |
| ) | |
| return errors | |
| def _check_property_name_related_field_accessor_clashes(cls): | |
| errors = [] | |
| property_names = cls._meta._property_names | |
| related_field_accessors = ( | |
| f.attname | |
| for f in cls._meta._get_fields(reverse=False) | |
| if f.is_relation and f.related_model is not None | |
| ) | |
| for accessor in related_field_accessors: | |
| if accessor in property_names: | |
| errors.append( | |
| checks.Error( | |
| "The property '%s' clashes with a related field " | |
| "accessor." % accessor, | |
| obj=cls, | |
| id="models.E025", | |
| ) | |
| ) | |
| return errors | |
| def _check_single_primary_key(cls): | |
| errors = [] | |
| if sum(1 for f in cls._meta.local_fields if f.primary_key) > 1: | |
| errors.append( | |
| checks.Error( | |
| "The model cannot have more than one field with " | |
| "'primary_key=True'.", | |
| obj=cls, | |
| id="models.E026", | |
| ) | |
| ) | |
| return errors | |
| def _check_unique_together(cls): | |
| """Check the value of "unique_together" option.""" | |
| if not isinstance(cls._meta.unique_together, (tuple, list)): | |
| return [ | |
| checks.Error( | |
| "'unique_together' must be a list or tuple.", | |
| obj=cls, | |
| id="models.E010", | |
| ) | |
| ] | |
| elif any( | |
| not isinstance(fields, (tuple, list)) | |
| for fields in cls._meta.unique_together | |
| ): | |
| return [ | |
| checks.Error( | |
| "All 'unique_together' elements must be lists or tuples.", | |
| obj=cls, | |
| id="models.E011", | |
| ) | |
| ] | |
| else: | |
| errors = [] | |
| for fields in cls._meta.unique_together: | |
| errors.extend(cls._check_local_fields(fields, "unique_together")) | |
| return errors | |
| def _check_indexes(cls, databases): | |
| errors = [] | |
| for db in databases: | |
| if not router.allow_migrate_model(db, cls): | |
| continue | |
| connection = connections[db] | |
| for index in cls._meta.indexes: | |
| errors.extend(index.check(cls, connection)) | |
| return errors | |
| def _check_local_fields(cls, fields, option): | |
| from django.db import models | |
| # In order to avoid hitting the relation tree prematurely, we use our | |
| # own fields_map instead of using get_field() | |
| forward_fields_map = {} | |
| for field in cls._meta._get_fields(reverse=False): | |
| forward_fields_map[field.name] = field | |
| if hasattr(field, "attname"): | |
| forward_fields_map[field.attname] = field | |
| errors = [] | |
| for field_name in fields: | |
| try: | |
| field = forward_fields_map[field_name] | |
| except KeyError: | |
| errors.append( | |
| checks.Error( | |
| "'%s' refers to the nonexistent field '%s'." | |
| % ( | |
| option, | |
| field_name, | |
| ), | |
| obj=cls, | |
| id="models.E012", | |
| ) | |
| ) | |
| else: | |
| if isinstance(field.remote_field, models.ManyToManyRel): | |
| errors.append( | |
| checks.Error( | |
| "'%s' refers to a ManyToManyField '%s', but " | |
| "ManyToManyFields are not permitted in '%s'." | |
| % ( | |
| option, | |
| field_name, | |
| option, | |
| ), | |
| obj=cls, | |
| id="models.E013", | |
| ) | |
| ) | |
| elif isinstance(field, models.CompositePrimaryKey): | |
| errors.append( | |
| checks.Error( | |
| f"{option!r} refers to a CompositePrimaryKey " | |
| f"{field_name!r}, but CompositePrimaryKeys are not " | |
| f"permitted in {option!r}.", | |
| obj=cls, | |
| id="models.E048", | |
| ) | |
| ) | |
| elif ( | |
| isinstance(field.remote_field, ForeignObjectRel) | |
| and field not in cls._meta.local_concrete_fields | |
| and len(field.from_fields) > 1 | |
| ): | |
| errors.append( | |
| checks.Error( | |
| f"{option!r} refers to a ForeignObject {field_name!r} with " | |
| "multiple 'from_fields', which is not supported for that " | |
| "option.", | |
| obj=cls, | |
| id="models.E049", | |
| ) | |
| ) | |
| elif field not in cls._meta.local_fields: | |
| errors.append( | |
| checks.Error( | |
| "'%s' refers to field '%s' which is not local to model " | |
| "'%s'." % (option, field_name, cls._meta.object_name), | |
| hint="This issue may be caused by multi-table inheritance.", | |
| obj=cls, | |
| id="models.E016", | |
| ) | |
| ) | |
| return errors | |
| def _check_ordering(cls): | |
| """ | |
| Check "ordering" option -- is it a list of strings and do all fields | |
| exist? | |
| """ | |
| if cls._meta._ordering_clash: | |
| return [ | |
| checks.Error( | |
| "'ordering' and 'order_with_respect_to' cannot be used together.", | |
| obj=cls, | |
| id="models.E021", | |
| ), | |
| ] | |
| if cls._meta.order_with_respect_to or not cls._meta.ordering: | |
| return [] | |
| if not isinstance(cls._meta.ordering, (list, tuple)): | |
| return [ | |
| checks.Error( | |
| "'ordering' must be a tuple or list (even if you want to order by " | |
| "only one field).", | |
| obj=cls, | |
| id="models.E014", | |
| ) | |
| ] | |
| errors = [] | |
| fields = cls._meta.ordering | |
| # Skip expressions and '?' fields. | |
| fields = (f for f in fields if isinstance(f, str) and f != "?") | |
| # Convert "-field" to "field". | |
| fields = (f.removeprefix("-") for f in fields) | |
| # Separate related fields and non-related fields. | |
| _fields = [] | |
| related_fields = [] | |
| for f in fields: | |
| if LOOKUP_SEP in f: | |
| related_fields.append(f) | |
| else: | |
| _fields.append(f) | |
| fields = _fields | |
| # Check related fields. | |
| for field in related_fields: | |
| _cls = cls | |
| fld = None | |
| for part in field.split(LOOKUP_SEP): | |
| try: | |
| # pk is an alias that won't be found by opts.get_field. | |
| if part == "pk": | |
| fld = _cls._meta.pk | |
| else: | |
| fld = _cls._meta.get_field(part) | |
| if fld.is_relation: | |
| _cls = fld.path_infos[-1].to_opts.model | |
| else: | |
| _cls = None | |
| except (FieldDoesNotExist, AttributeError): | |
| if fld is None or ( | |
| fld.get_transform(part) is None and fld.get_lookup(part) is None | |
| ): | |
| errors.append( | |
| checks.Error( | |
| "'ordering' refers to the nonexistent field, " | |
| "related field, or lookup '%s'." % field, | |
| obj=cls, | |
| id="models.E015", | |
| ) | |
| ) | |
| # Skip ordering on pk. This is always a valid order_by field | |
| # but is an alias and therefore won't be found by opts.get_field. | |
| fields = {f for f in fields if f != "pk"} | |
| # Check for invalid or nonexistent fields in ordering. | |
| invalid_fields = [] | |
| # Any field name that is not present in field_names does not exist. | |
| # Also, ordering by m2m fields is not allowed. | |
| opts = cls._meta | |
| valid_fields = set( | |
| chain.from_iterable( | |
| ( | |
| (f.name, f.attname) | |
| if not (f.auto_created and not f.concrete) | |
| else (f.field.related_query_name(),) | |
| ) | |
| for f in chain(opts.fields, opts.related_objects) | |
| ) | |
| ) | |
| invalid_fields.extend(fields - valid_fields) | |
| for invalid_field in invalid_fields: | |
| errors.append( | |
| checks.Error( | |
| "'ordering' refers to the nonexistent field, related " | |
| "field, or lookup '%s'." % invalid_field, | |
| obj=cls, | |
| id="models.E015", | |
| ) | |
| ) | |
| return errors | |
| def _check_long_column_names(cls, databases): | |
| """ | |
| Check that any auto-generated column names are shorter than the limits | |
| for each database in which the model will be created. | |
| """ | |
| if not databases: | |
| return [] | |
| errors = [] | |
| allowed_len = None | |
| db_alias = None | |
| # Find the minimum max allowed length among all specified db_aliases. | |
| for db in databases: | |
| # skip databases where the model won't be created | |
| if not router.allow_migrate_model(db, cls): | |
| continue | |
| connection = connections[db] | |
| max_name_length = connection.ops.max_name_length() | |
| if max_name_length is None or connection.features.truncates_names: | |
| continue | |
| else: | |
| if allowed_len is None: | |
| allowed_len = max_name_length | |
| db_alias = db | |
| elif max_name_length < allowed_len: | |
| allowed_len = max_name_length | |
| db_alias = db | |
| if allowed_len is None: | |
| return errors | |
| for f in cls._meta.local_fields: | |
| # Check if auto-generated name for the field is too long | |
| # for the database. | |
| if ( | |
| f.db_column is None | |
| and (column_name := f.column) is not None | |
| and len(column_name) > allowed_len | |
| ): | |
| errors.append( | |
| checks.Error( | |
| 'Autogenerated column name too long for field "%s". ' | |
| 'Maximum length is "%s" for database "%s".' | |
| % (column_name, allowed_len, db_alias), | |
| hint="Set the column name manually using 'db_column'.", | |
| obj=cls, | |
| id="models.E018", | |
| ) | |
| ) | |
| for f in cls._meta.local_many_to_many: | |
| # Skip nonexistent models. | |
| if isinstance(f.remote_field.through, str): | |
| continue | |
| # Check if auto-generated name for the M2M field is too long | |
| # for the database. | |
| for m2m in f.remote_field.through._meta.local_fields: | |
| if ( | |
| m2m.db_column is None | |
| and (rel_name := m2m.column) is not None | |
| and len(rel_name) > allowed_len | |
| ): | |
| errors.append( | |
| checks.Error( | |
| "Autogenerated column name too long for M2M field " | |
| '"%s". Maximum length is "%s" for database "%s".' | |
| % (rel_name, allowed_len, db_alias), | |
| hint=( | |
| "Use 'through' to create a separate model for " | |
| "M2M and then set column_name using 'db_column'." | |
| ), | |
| obj=cls, | |
| id="models.E019", | |
| ) | |
| ) | |
| return errors | |
| def _get_expr_references(cls, expr): | |
| if isinstance(expr, Q): | |
| for child in expr.children: | |
| if isinstance(child, tuple): | |
| lookup, value = child | |
| yield tuple(lookup.split(LOOKUP_SEP)) | |
| yield from cls._get_expr_references(value) | |
| else: | |
| yield from cls._get_expr_references(child) | |
| elif isinstance(expr, F): | |
| yield tuple(expr.name.split(LOOKUP_SEP)) | |
| elif hasattr(expr, "get_source_expressions"): | |
| for src_expr in expr.get_source_expressions(): | |
| yield from cls._get_expr_references(src_expr) | |
| def _check_constraints(cls, databases): | |
| errors = [] | |
| for db in databases: | |
| if not router.allow_migrate_model(db, cls): | |
| continue | |
| connection = connections[db] | |
| for constraint in cls._meta.constraints: | |
| errors.extend(constraint.check(cls, connection)) | |
| return errors | |
| ############################################ | |
| # HELPER FUNCTIONS (CURRIED MODEL METHODS) # | |
| ############################################ | |
| # ORDERING METHODS ######################### | |
| def method_set_order(self, ordered_obj, id_list, using=None): | |
| order_wrt = ordered_obj._meta.order_with_respect_to | |
| filter_args = order_wrt.get_forward_related_filter(self) | |
| ordered_obj.objects.db_manager(using).filter(**filter_args).bulk_update( | |
| [ordered_obj(pk=pk, _order=order) for order, pk in enumerate(id_list)], | |
| ["_order"], | |
| ) | |
| def method_get_order(self, ordered_obj): | |
| order_wrt = ordered_obj._meta.order_with_respect_to | |
| filter_args = order_wrt.get_forward_related_filter(self) | |
| pk_name = ordered_obj._meta.pk.name | |
| return ordered_obj.objects.filter(**filter_args).values_list(pk_name, flat=True) | |
| def make_foreign_order_accessors(model, related_model): | |
| setattr( | |
| related_model, | |
| "get_%s_order" % model.__name__.lower(), | |
| partialmethod(method_get_order, model), | |
| ) | |
| setattr( | |
| related_model, | |
| "set_%s_order" % model.__name__.lower(), | |
| partialmethod(method_set_order, model), | |
| ) | |
| ######## | |
| # MISC # | |
| ######## | |
| def model_unpickle(model_id): | |
| """Used to unpickle Model subclasses with deferred fields.""" | |
| if isinstance(model_id, tuple): | |
| model = apps.get_model(*model_id) | |
| else: | |
| # Backwards compat - the model was cached directly in earlier versions. | |
| model = model_id | |
| return model.__new__(model) | |
| model_unpickle.__safe_for_unpickle__ = True | |
Xet Storage Details
- Size:
- 97.7 kB
- Xet hash:
- e08f6b35e72f1c0cc2220b6e063ddf6db71f84d9c37cf22ec4f3d7cc1ee439d1
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.