Spaces:
Runtime error
Runtime error
| from collections import defaultdict | |
| from contextlib import contextmanager | |
| from copy import deepcopy | |
| from logging import warning | |
| import numpy as np | |
| import pandas as pd | |
| from aif360.datasets import Dataset | |
| class StructuredDataset(Dataset): | |
| """Base class for all structured datasets. | |
| A StructuredDataset requires data to be stored in :obj:`numpy.ndarray` | |
| objects with :obj:`~numpy.dtype` as :obj:`~numpy.float64`. | |
| Attributes: | |
| features (numpy.ndarray): Dataset features for each instance. | |
| labels (numpy.ndarray): Generic label corresponding to each instance | |
| (could be ground-truth, predicted, cluster assignments, etc.). | |
| scores (numpy.ndarray): Probability score associated with each label. | |
| Same shape as `labels`. Only valid for binary labels (this includes | |
| one-hot categorical labels as well). | |
| protected_attributes (numpy.ndarray): A subset of `features` for which | |
| fairness is desired. | |
| feature_names (list(str)): Names describing each dataset feature. | |
| label_names (list(str)): Names describing each label. | |
| protected_attribute_names (list(str)): A subset of `feature_names` | |
| corresponding to `protected_attributes`. | |
| privileged_protected_attributes (list(numpy.ndarray)): A subset of | |
| protected attribute values which are considered privileged from a | |
| fairness perspective. | |
| unprivileged_protected_attributes (list(numpy.ndarray)): The remaining | |
| possible protected attribute values which are not included in | |
| `privileged_protected_attributes`. | |
| instance_names (list(str)): Indentifiers for each instance. Sequential | |
| integers by default. | |
| instance_weights (numpy.ndarray): Weighting for each instance. All | |
| equal (ones) by default. Pursuant to standard practice in social | |
| science data, 1 means one person or entity. These weights are hence | |
| person or entity multipliers (see: | |
| https://www.ibm.com/support/knowledgecenter/en/SS3RA7_15.0.0/com.ibm.spss.modeler.help/netezza_decisiontrees_weights.htm) | |
| These weights *may not* be normalized to sum to 1 across the entire | |
| dataset, rather the nominal (default) weight of each entity/record | |
| in the data is 1. This is similar in spirit to the person weight in | |
| census microdata samples. | |
| https://www.census.gov/programs-surveys/acs/technical-documentation/pums/about.html | |
| ignore_fields (set(str)): Attribute names to ignore when doing equality | |
| comparisons. Always at least contains `'metadata'`. | |
| metadata (dict): Details about the creation of this dataset. For | |
| example:: | |
| { | |
| 'transformer': 'Dataset.__init__', | |
| 'params': kwargs, | |
| 'previous': None | |
| } | |
| """ | |
| def __init__(self, df, label_names, protected_attribute_names, | |
| instance_weights_name=None, scores_names=[], | |
| unprivileged_protected_attributes=[], | |
| privileged_protected_attributes=[], metadata=None): | |
| """ | |
| Args: | |
| df (pandas.DataFrame): Input DataFrame with features, labels, and | |
| protected attributes. Values should be preprocessed | |
| to remove NAs and make all data numerical. Index values are | |
| taken as instance names. | |
| label_names (iterable): Names of the label columns in `df`. | |
| protected_attribute_names (iterable): List of names corresponding to | |
| protected attribute columns in `df`. | |
| instance_weights_name (optional): Column name in `df` corresponding | |
| to instance weights. If not provided, `instance_weights` will be | |
| all set to 1. | |
| unprivileged_protected_attributes (optional): If not provided, all | |
| but the highest numerical value of each protected attribute will | |
| be considered not privileged. | |
| privileged_protected_attributes (optional): If not provided, the | |
| highest numerical value of each protected attribute will be | |
| considered privileged. | |
| metadata (optional): Additional metadata to append. | |
| Raises: | |
| TypeError: Certain fields must be np.ndarrays as specified in the | |
| class description. | |
| ValueError: ndarray shapes must match. | |
| """ | |
| if df is None: | |
| raise TypeError("Must provide a pandas DataFrame representing " | |
| "the data (features, labels, protected attributes)") | |
| #if df.isna().any().any(): | |
| # raise ValueError("Input DataFrames cannot contain NA values.") | |
| #try: | |
| # df = df.astype(np.float64) | |
| #except ValueError as e: | |
| # print("ValueError: {}".format(e)) | |
| # raise ValueError("DataFrame values must be numerical.") | |
| # Convert all column names to strings | |
| df.columns = df.columns.astype(str).tolist() | |
| label_names = list(map(str, label_names)) | |
| protected_attribute_names = list(map(str, protected_attribute_names)) | |
| self.feature_names = [n for n in df.columns if n not in label_names | |
| and (not scores_names or n not in scores_names) | |
| and n != instance_weights_name] | |
| self.label_names = label_names | |
| self.features = df[self.feature_names].values.copy() | |
| self.labels = df[self.label_names].values.copy() | |
| self.instance_names = df.index.astype(str).tolist() | |
| if scores_names: | |
| self.scores = df[scores_names].values.copy() | |
| else: | |
| self.scores = self.labels.copy() | |
| df_prot = df.loc[:, protected_attribute_names] | |
| self.protected_attribute_names = df_prot.columns.astype(str).tolist() | |
| self.protected_attributes = df_prot.values.copy() | |
| # Infer the privileged and unprivileged values in not provided | |
| if unprivileged_protected_attributes and privileged_protected_attributes: | |
| self.unprivileged_protected_attributes = unprivileged_protected_attributes | |
| self.privileged_protected_attributes = privileged_protected_attributes | |
| else: | |
| self.unprivileged_protected_attributes = [ | |
| np.sort(np.unique(df_prot[attr].values))[:-1] | |
| for attr in self.protected_attribute_names] | |
| self.privileged_protected_attributes = [ | |
| np.sort(np.unique(df_prot[attr].values))[-1:] | |
| for attr in self.protected_attribute_names] | |
| if instance_weights_name: | |
| self.instance_weights = df[instance_weights_name].values.copy() | |
| else: | |
| self.instance_weights = np.ones_like(self.instance_names, | |
| dtype=np.float64) | |
| # always ignore metadata and ignore_fields | |
| self.ignore_fields = {'metadata', 'ignore_fields'} | |
| # sets metadata | |
| super(StructuredDataset, self).__init__(df=df, label_names=label_names, | |
| protected_attribute_names=protected_attribute_names, | |
| instance_weights_name=instance_weights_name, | |
| unprivileged_protected_attributes=unprivileged_protected_attributes, | |
| privileged_protected_attributes=privileged_protected_attributes, | |
| metadata=metadata) | |
| def subset(self, indexes): | |
| """ Subset of dataset based on position | |
| Args: | |
| indexes: iterable which contains row indexes | |
| Returns: | |
| `StructuredDataset`: subset of dataset based on indexes | |
| """ | |
| # convert each element of indexes to string | |
| indexes_str = [self.instance_names[i] for i in indexes] | |
| subset = self.copy() | |
| subset.instance_names = indexes_str | |
| subset.features = self.features[indexes] | |
| subset.labels = self.labels[indexes] | |
| subset.instance_weights = self.instance_weights[indexes] | |
| subset.protected_attributes = self.protected_attributes[indexes] | |
| subset.scores = self.scores[indexes] | |
| return subset | |
| def __eq__(self, other): | |
| """Equality comparison for StructuredDatasets. | |
| Note: Compares all fields other than those specified in `ignore_fields`. | |
| """ | |
| if not isinstance(other, StructuredDataset): | |
| return False | |
| def _eq(x, y): | |
| if isinstance(x, np.ndarray) and isinstance(y, np.ndarray): | |
| return np.all(x == y) | |
| elif isinstance(x, list) and isinstance(y, list): | |
| return len(x) == len(y) and all(_eq(xi, yi) for xi, yi in zip(x, y)) | |
| return x == y | |
| return all(_eq(self.__dict__[k], other.__dict__[k]) | |
| for k in self.__dict__.keys() if k not in self.ignore_fields) | |
| def __ne__(self, other): | |
| return not self == other | |
| def __repr__(self): | |
| # return repr(self.metadata) | |
| return str(self) | |
| def __str__(self): | |
| df, _ = self.convert_to_dataframe() | |
| df.insert(0, 'instance_weights', self.instance_weights) | |
| highest_level = ['instance weights'] + \ | |
| ['features']*len(self.feature_names) + \ | |
| ['labels']*len(self.label_names) | |
| middle_level = [''] + \ | |
| ['protected attribute' | |
| if f in self.protected_attribute_names else '' | |
| for f in self.feature_names] + \ | |
| ['']*len(self.label_names) | |
| lowest_level = [''] + self.feature_names + ['']*len(self.label_names) | |
| df.columns = pd.MultiIndex.from_arrays( | |
| [highest_level, middle_level, lowest_level]) | |
| df.index.name = 'instance names' | |
| return str(df) | |
| # TODO: *_names checks | |
| def validate_dataset(self): | |
| """Error checking and type validation. | |
| Raises: | |
| TypeError: Certain fields must be np.ndarrays as specified in the | |
| class description. | |
| ValueError: ndarray shapes must match. | |
| """ | |
| super(StructuredDataset, self).validate_dataset() | |
| # =========================== TYPE CHECKING ============================ | |
| for f in [self.features, self.protected_attributes, self.labels, | |
| self.scores, self.instance_weights]: | |
| if not isinstance(f, np.ndarray): | |
| raise TypeError("'{}' must be an np.ndarray.".format(f.__name__)) | |
| # convert ndarrays to float64 | |
| self.features = self.features.astype(np.float64) | |
| self.protected_attributes = self.protected_attributes.astype(np.float64) | |
| self.labels = self.labels.astype(np.float64) | |
| self.instance_weights = self.instance_weights.astype(np.float64) | |
| # =========================== SHAPE CHECKING =========================== | |
| if len(self.labels.shape) == 1: | |
| self.labels = self.labels.reshape((-1, 1)) | |
| try: | |
| self.scores.reshape(self.labels.shape) | |
| except ValueError as e: | |
| print("ValueError: {}".format(e)) | |
| raise ValueError("'scores' should have the same shape as 'labels'.") | |
| if not self.labels.shape[0] == self.features.shape[0]: | |
| raise ValueError("Number of labels must match number of instances:" | |
| "\n\tlabels.shape = {}\n\tfeatures.shape = {}".format( | |
| self.labels.shape, self.features.shape)) | |
| if not self.instance_weights.shape[0] == self.features.shape[0]: | |
| raise ValueError("Number of weights must match number of instances:" | |
| "\n\tinstance_weights.shape = {}\n\tfeatures.shape = {}".format( | |
| self.instance_weights.shape, self.features.shape)) | |
| # =========================== VALUE CHECKING =========================== | |
| if np.any(np.logical_or(self.scores < 0., self.scores > 1.)): | |
| warning("'scores' has no well-defined meaning out of range [0, 1].") | |
| for i in range(len(self.privileged_protected_attributes)): | |
| priv = set(self.privileged_protected_attributes[i]) | |
| unpriv = set(self.unprivileged_protected_attributes[i]) | |
| # check for duplicates | |
| if priv & unpriv: | |
| raise ValueError("'privileged_protected_attributes' and " | |
| "'unprivileged_protected_attributes' should not share any " | |
| "common elements:\n\tBoth contain {} for feature {}".format( | |
| list(priv & unpriv), self.protected_attribute_names[i])) | |
| # check for unclassified values | |
| if not set(self.protected_attributes[:, i]) <= (priv | unpriv): | |
| raise ValueError("All observed values for protected attributes " | |
| "should be designated as either privileged or unprivileged:" | |
| "\n\t{} not designated for feature {}".format( | |
| list(set(self.protected_attributes[:, i]) | |
| - (priv | unpriv)), | |
| self.protected_attribute_names[i])) | |
| # warn for unobserved values | |
| if not (priv | unpriv) <= set(self.protected_attributes[:, i]): | |
| warning("{} listed but not observed for feature {}".format( | |
| list((priv | unpriv) - set(self.protected_attributes[:, i])), | |
| self.protected_attribute_names[i])) | |
| def temporarily_ignore(self, *fields): | |
| """Temporarily add the fields provided to `ignore_fields`. | |
| To be used in a `with` statement. Upon completing the `with` block, | |
| `ignore_fields` is restored to its original value. | |
| Args: | |
| *fields: Additional fields to ignore for equality comparison within | |
| the scope of this context manager, e.g. | |
| `temporarily_ignore('features', 'labels')`. The temporary | |
| `ignore_fields` attribute is the union of the old attribute and | |
| the set of these fields. | |
| Examples: | |
| >>> sd = StructuredDataset(...) | |
| >>> modified = sd.copy() | |
| >>> modified.labels = sd.labels + 1 | |
| >>> assert sd != modified | |
| >>> with sd.temporarily_ignore('labels'): | |
| >>> assert sd == modified | |
| >>> assert 'labels' not in sd.ignore_fields | |
| """ | |
| old_ignore = deepcopy(self.ignore_fields) | |
| self.ignore_fields |= set(fields) | |
| try: | |
| yield | |
| finally: | |
| self.ignore_fields = old_ignore | |
| def align_datasets(self, other): | |
| """Align the other dataset features, labels and protected_attributes to | |
| this dataset. | |
| Args: | |
| other (StructuredDataset): Other dataset that needs to be aligned | |
| Returns: | |
| StructuredDataset: New aligned dataset | |
| """ | |
| if (set(self.feature_names) != set(other.feature_names) or | |
| set(self.label_names) != set(other.label_names) or | |
| set(self.protected_attribute_names) | |
| != set(other.protected_attribute_names)): | |
| raise ValueError( | |
| "feature_names, label_names, and protected_attribute_names " | |
| "should match between this and other dataset.") | |
| # New dataset | |
| new = other.copy() | |
| # re-order the columns of the new dataset | |
| feat_inds = [new.feature_names.index(f) for f in self.feature_names] | |
| label_inds = [new.label_names.index(f) for f in self.label_names] | |
| prot_inds = [new.protected_attribute_names.index(f) | |
| for f in self.protected_attribute_names] | |
| new.features = new.features[:, feat_inds] | |
| new.labels = new.labels[:, label_inds] | |
| new.scores = new.scores[:, label_inds] | |
| new.protected_attributes = new.protected_attributes[:, prot_inds] | |
| new.privileged_protected_attributes = [ | |
| new.privileged_protected_attributes[i] for i in prot_inds] | |
| new.unprivileged_protected_attributes = [ | |
| new.unprivileged_protected_attributes[i] for i in prot_inds] | |
| new.feature_names = deepcopy(self.feature_names) | |
| new.label_names = deepcopy(self.label_names) | |
| new.protected_attribute_names = deepcopy(self.protected_attribute_names) | |
| return new | |
| # TODO: Should we store the protected attributes as a separate dataframe | |
| def convert_to_dataframe(self, de_dummy_code=False, sep='=', | |
| set_category=True): | |
| """Convert the StructuredDataset to a :obj:`pandas.DataFrame`. | |
| Args: | |
| de_dummy_code (bool): Performs de_dummy_coding, converting dummy- | |
| coded columns to categories. If `de_dummy_code` is `True` and | |
| this dataset contains mappings for label and/or protected | |
| attribute values to strings in the `metadata`, this method will | |
| convert those as well. | |
| sep (char): Separator between the prefix in the dummy indicators and | |
| the dummy-coded categorical levels. | |
| set_category (bool): Set the de-dummy coded features to categorical | |
| type. | |
| Returns: | |
| (pandas.DataFrame, dict): | |
| * `pandas.DataFrame`: Equivalent dataframe for a dataset. All | |
| columns will have only numeric values. The | |
| `protected_attributes` field in the dataset will override the | |
| values in the `features` field. | |
| * `dict`: Attributes. Will contain additional information pulled | |
| from the dataset such as `feature_names`, `label_names`, | |
| `protected_attribute_names`, `instance_names`, | |
| `instance_weights`, `privileged_protected_attributes`, | |
| `unprivileged_protected_attributes`. The metadata will not be | |
| returned. | |
| """ | |
| df = pd.DataFrame(np.hstack((self.features, self.labels)), | |
| columns=self.feature_names+self.label_names, | |
| index=self.instance_names) | |
| df.loc[:, self.protected_attribute_names] = self.protected_attributes | |
| # De-dummy code if necessary | |
| if de_dummy_code: | |
| df = self._de_dummy_code_df(df, sep=sep, set_category=set_category) | |
| if 'label_maps' in self.metadata: | |
| for i, label in enumerate(self.label_names): | |
| df[label] = df[label].replace(self.metadata['label_maps'][i]) | |
| if 'protected_attribute_maps' in self.metadata: | |
| for i, prot_attr in enumerate(self.protected_attribute_names): | |
| df[prot_attr] = df[prot_attr].replace( | |
| self.metadata['protected_attribute_maps'][i]) | |
| # Attributes | |
| attributes = { | |
| "feature_names": self.feature_names, | |
| "label_names": self.label_names, | |
| "protected_attribute_names": self.protected_attribute_names, | |
| "instance_names": self.instance_names, | |
| "instance_weights": self.instance_weights, | |
| "privileged_protected_attributes": self.privileged_protected_attributes, | |
| "unprivileged_protected_attributes": self.unprivileged_protected_attributes | |
| } | |
| return df, attributes | |
| def export_dataset(self, export_metadata=False): | |
| """ | |
| Export the dataset and supporting attributes | |
| TODO: The preferred file format is HDF | |
| """ | |
| if export_metadata: | |
| raise NotImplementedError("The option to export metadata has not been implemented yet") | |
| return None | |
| def import_dataset(self, import_metadata=False): | |
| """ Import the dataset and supporting attributes | |
| TODO: The preferred file format is HDF | |
| """ | |
| if import_metadata: | |
| raise NotImplementedError("The option to import metadata has not been implemented yet") | |
| return None | |
| def split(self, num_or_size_splits, shuffle=False, seed=None): | |
| """Split this dataset into multiple partitions. | |
| Args: | |
| num_or_size_splits (array or int): If `num_or_size_splits` is an | |
| int, *k*, the value is the number of equal-sized folds to make | |
| (if *k* does not evenly divide the dataset these folds are | |
| approximately equal-sized). If `num_or_size_splits` is an array | |
| of type int, the values are taken as the indices at which to | |
| split the dataset. If the values are floats (< 1.), they are | |
| considered to be fractional proportions of the dataset at which | |
| to split. | |
| shuffle (bool, optional): Randomly shuffle the dataset before | |
| splitting. | |
| seed (int or array_like): Takes the same argument as | |
| :func:`numpy.random.seed()`. | |
| Returns: | |
| list: Splits. Contains *k* or `len(num_or_size_splits) + 1` | |
| datasets depending on `num_or_size_splits`. | |
| """ | |
| # Set seed | |
| if seed is not None: | |
| np.random.seed(seed) | |
| n = self.features.shape[0] | |
| if isinstance(num_or_size_splits, list): | |
| num_folds = len(num_or_size_splits) + 1 | |
| if num_folds > 1 and all(x <= 1. for x in num_or_size_splits): | |
| num_or_size_splits = [int(x * n) for x in num_or_size_splits] | |
| else: | |
| num_folds = num_or_size_splits | |
| order = list(np.random.permutation(n) if shuffle else range(n)) | |
| folds = [self.copy() for _ in range(num_folds)] | |
| features = np.array_split(self.features[order], num_or_size_splits) | |
| labels = np.array_split(self.labels[order], num_or_size_splits) | |
| scores = np.array_split(self.scores[order], num_or_size_splits) | |
| protected_attributes = np.array_split(self.protected_attributes[order], | |
| num_or_size_splits) | |
| instance_weights = np.array_split(self.instance_weights[order], | |
| num_or_size_splits) | |
| instance_names = np.array_split(np.array(self.instance_names)[order], | |
| num_or_size_splits) | |
| for fold, feats, labs, scors, prot_attrs, inst_wgts, inst_name in zip( | |
| folds, features, labels, scores, protected_attributes, instance_weights, | |
| instance_names): | |
| fold.features = feats | |
| fold.labels = labs | |
| fold.scores = scors | |
| fold.protected_attributes = prot_attrs | |
| fold.instance_weights = inst_wgts | |
| fold.instance_names = list(map(str, inst_name)) | |
| fold.metadata = fold.metadata.copy() | |
| fold.metadata.update({ | |
| 'transformer': '{}.split'.format(type(self).__name__), | |
| 'params': {'num_or_size_splits': num_or_size_splits, | |
| 'shuffle': shuffle}, | |
| 'previous': [self] | |
| }) | |
| return folds | |
| def _de_dummy_code_df(df, sep="=", set_category=False): | |
| """De-dummy code a dummy-coded dataframe obtained with pd.get_dummies(). | |
| After reversing dummy coding the corresponding fields will be converted | |
| to categorical. | |
| Args: | |
| df (pandas.DataFrame): Input dummy coded dataframe | |
| sep (char): Separator between base name and dummy code | |
| set_category (bool): Set the de-dummy coded features | |
| to categorical type | |
| Examples: | |
| >>> columns = ["Age", "Gender=Male", "Gender=Female"] | |
| >>> df = pd.DataFrame([[10, 1, 0], [20, 0, 1]], columns=columns) | |
| >>> _de_dummy_code_df(df, sep="=") | |
| Age Gender | |
| 0 10 Male | |
| 1 20 Female | |
| """ | |
| feature_names_dum_d, feature_names_nodum = \ | |
| StructuredDataset._parse_feature_names(df.columns) | |
| df_new = pd.DataFrame(index=df.index, | |
| columns=feature_names_nodum + list(feature_names_dum_d.keys())) | |
| for fname in feature_names_nodum: | |
| df_new[fname] = df[fname].values.copy() | |
| for fname, vl in feature_names_dum_d.items(): | |
| for v in vl: | |
| df_new.loc[df[fname+sep+str(v)] == 1, fname] = str(v) | |
| if set_category: | |
| for fname in feature_names_dum_d.keys(): | |
| df_new[fname] = df_new[fname].astype('category') | |
| return df_new | |
| def _parse_feature_names(feature_names, sep="="): | |
| """Parse feature names to ordinary and dummy coded candidates. | |
| Args: | |
| feature_names (list): Names of features | |
| sep (char): Separator to designate the dummy coded category in the | |
| feature name | |
| Returns: | |
| (dict, list): | |
| * feature_names_dum_d (dict): Keys are the base feature names | |
| and values are the categories. | |
| * feature_names_nodum (list): Non-dummy coded feature names. | |
| Examples: | |
| >>> feature_names = ["Age", "Gender=Male", "Gender=Female"] | |
| >>> StructuredDataset._parse_feature_names(feature_names, sep="=") | |
| (defaultdict(<type 'list'>, {'Gender': ['Male', 'Female']}), ['Age']) | |
| """ | |
| feature_names_dum_d = defaultdict(list) | |
| feature_names_nodum = list() | |
| for fname in feature_names: | |
| if sep in fname: | |
| fname_dum, v = fname.split(sep, 1) | |
| feature_names_dum_d[fname_dum].append(v) | |
| else: | |
| feature_names_nodum.append(fname) | |
| return feature_names_dum_d, feature_names_nodum | |