Spaces:
Running
Running
| import hashlib | |
| import json | |
| import os | |
| import shutil | |
| from collections import defaultdict | |
| from typing import Any, Dict, List, Optional, Sequence, Tuple, Union | |
| from typing_extensions import Self | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.decomposition import PCA | |
| from sklearn.preprocessing import OrdinalEncoder, StandardScaler | |
| from .utils import load_from, log_resource_usage, save_to | |
| class ForeignKey: | |
| def __init__(self, | |
| child_table_name: str, | |
| parent_table_name: str, | |
| child_column_names: Union[str, Sequence[str]], | |
| parent_column_names: Optional[Union[str, Sequence[str]]] = None, | |
| unique: bool = False, | |
| total_participate: bool = False): | |
| self.child_table_name = child_table_name | |
| self.parent_table_name = parent_table_name | |
| self.child_column_names = child_column_names if not isinstance(child_column_names, str) else [ | |
| child_column_names] | |
| if parent_column_names is None: | |
| parent_column_names = self.child_column_names | |
| self.parent_column_names = parent_column_names if \ | |
| not isinstance(parent_column_names, str) else [parent_column_names] | |
| self.unique = unique | |
| self.total_participate = total_participate | |
| def __eq__(self, other: Any) -> bool: | |
| if not isinstance(other, ForeignKey): | |
| return False | |
| for k in ["child_table_name", "parent_table_name", "child_column_names"]: | |
| if getattr(self, k) != getattr(other, k): | |
| return False | |
| return True | |
| class TableConfig: | |
| def __init__(self, | |
| name: str, | |
| primary_key: Optional[Union[str, Sequence[str]]] = None, | |
| foreign_keys: Optional[Sequence[ForeignKey]] = None, | |
| sortby: Optional[str] = None, | |
| id_columns: Optional[Sequence[str]] = None, | |
| inequality: Optional[Union[Tuple[str, str], Tuple[Tuple[str, ...], Tuple[str, ...]]]] = None): | |
| self.name = name | |
| self.primary_key = primary_key if not isinstance(primary_key, str) else [primary_key] | |
| self.foreign_keys = foreign_keys if foreign_keys is not None else [] | |
| self.sortby = sortby | |
| self.id_columns = id_columns if id_columns is not None else [] | |
| self.inequality = [ | |
| ([a], [b]) if isinstance(a, str) else (a, b) for a, b in inequality | |
| ] if inequality is not None else [] | |
| def from_dict(cls, data: Dict[str, Any]) -> Self: | |
| foreign_keys = data.get("foreign_keys", []) | |
| foreign_keys = [ForeignKey(**x, child_table_name=data["name"]) for x in foreign_keys] | |
| data = data.copy() | |
| data["foreign_keys"] = foreign_keys | |
| return cls(**data) | |
| class TableTransformer: | |
| def __init__(self, config: TableConfig): | |
| self.config = config | |
| self.columns = [] | |
| self.categorical_columns = [] | |
| self.numeric_columns = [] | |
| self.agg_columns = None | |
| self.top_cat_values = {} | |
| self.agg_transformer = StandardScaler() if self.config.foreign_keys else None | |
| self.cat_transformer = OrdinalEncoder() | |
| self.num_transformer = StandardScaler() | |
| self.count_null = [] | |
| self.split_dim = 0 | |
| def fit(self, table: pd.DataFrame): | |
| for c in self.config.id_columns: | |
| if table[c].isna().any(): | |
| self.count_null.append(c) | |
| self.columns = table.columns | |
| numeric_columns = table.select_dtypes(include=np.number).columns | |
| categorical_columns = table.drop(columns=numeric_columns.tolist()).columns | |
| self.categorical_columns = [ | |
| c for c in categorical_columns if c not in self.config.id_columns | |
| ] | |
| self.numeric_columns = [ | |
| c for c in numeric_columns if c not in self.config.id_columns | |
| ] | |
| for c in self.categorical_columns: | |
| self.top_cat_values[c] = table[c].value_counts().iloc[:3].values.tolist() | |
| if self.config.foreign_keys: | |
| aggregated, table = self.aggregate(table) | |
| aggregated = aggregated.bfill().ffill() | |
| self.agg_columns = aggregated.columns | |
| if aggregated.shape[-1] > 0: | |
| self.agg_transformer.fit(aggregated.values) | |
| table = table.bfill().ffill() | |
| if self.categorical_columns: | |
| cat = self.cat_transformer.fit_transform(table[self.categorical_columns].values) | |
| self.split_dim = cat.shape[1] | |
| else: | |
| self.split_dim = 0 | |
| if self.numeric_columns: | |
| self.num_transformer.fit(table[self.numeric_columns].values) | |
| def aggregate(self, table: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
| if not self.config.foreign_keys: | |
| raise RuntimeError(f"Table {self.config.name} has no FK, so aggregate is not a valid operation.") | |
| groupby_columns = self.config.foreign_keys[0].child_column_names | |
| groupby = table.groupby(groupby_columns) | |
| if self.config.sortby: | |
| first_sortby: pd.Series = groupby[self.config.sortby].head(1) | |
| first_sortby.index = pd.MultiIndex.from_frame( | |
| table.loc[first_sortby.index, groupby_columns] | |
| ) | |
| sorby_diff: pd.Series = groupby[self.config.sortby].diff() | |
| sorby_diff = sorby_diff.fillna(sorby_diff.mean()) | |
| table = pd.concat([ | |
| table.drop(columns=[self.config.sortby]), | |
| sorby_diff.to_frame(self.config.sortby) | |
| ], axis=1)[table.columns] | |
| groupby = table.groupby(groupby_columns) | |
| out = self._aggregate_values(groupby) | |
| out = pd.concat([ | |
| out, pd.concat({self.config.sortby: first_sortby.to_frame("first")}, axis=1) | |
| ], axis=1) | |
| else: | |
| out = self._aggregate_values(groupby) | |
| out.columns = pd.Index([f"{a}${b}" for a, b in out.columns]) | |
| return out, table | |
| def _aggregate_values(self, groupby): | |
| if self.numeric_columns: | |
| num_groupby = groupby[self.numeric_columns] | |
| out = num_groupby.aggregate(["mean", "median", "std"]).fillna(0) | |
| else: | |
| out = pd.concat({"": groupby.size().to_frame()[[]]}, axis=1) | |
| if len(self.categorical_columns) > 0: | |
| cat_groupby = groupby[self.categorical_columns] | |
| out = pd.concat([out, self._aggregate_categorical(cat_groupby)], axis=1) | |
| if len(self.count_null) > 0: | |
| null_groupby = groupby[self.count_null].aggregate(lambda group: group.isna().mean()) | |
| null_groupby = pd.concat({"null-ratio": null_groupby}, axis=1).swaplevel(0, 1, axis=1) | |
| out = pd.concat([out, null_groupby], axis=1) | |
| if out.index.nlevels <= 1: | |
| out.index = pd.MultiIndex.from_arrays([out.index], names=[out.index.name]) | |
| return out | |
| def _aggregate_categorical(self, grouped: pd.core.groupby.generic.DataFrameGroupBy) -> pd.DataFrame: | |
| df = grouped.obj | |
| group_keys = grouped.grouper.names | |
| results = {} | |
| sizes = grouped.size() | |
| for col, values in self.top_cat_values.items(): | |
| ctab = pd.crosstab(index=[df[k] for k in group_keys], columns=df[col]) | |
| ctab = ctab.reindex(columns=values, fill_value=0) | |
| ctab_ratio = ctab / sizes.loc[ctab.index].values.reshape((-1, 1)) | |
| results[col] = ctab_ratio | |
| final = pd.concat(results, axis=1) | |
| return final | |
| def transform(self, table: pd.DataFrame) -> Tuple[ | |
| np.ndarray, Optional[Dict[Tuple, np.ndarray]], Optional[np.ndarray], Optional[pd.Index] | |
| ]: | |
| table = table.reset_index(drop=True) | |
| if self.config.foreign_keys: | |
| groups = table.groupby(self.config.foreign_keys[0].child_column_names).groups | |
| groups = { | |
| k: v.values for k, v in groups.items() | |
| } | |
| aggregated, table = self.aggregate(table) | |
| if aggregated.index.nlevels <= 1: | |
| groups = {(k,): v for k, v in groups.items()} | |
| agg_index = aggregated.index | |
| if aggregated.shape[-1] > 0: | |
| aggregated = self.agg_transformer.transform(aggregated.values) | |
| else: | |
| aggregated = aggregated.values | |
| else: | |
| groups = None | |
| agg_index = None | |
| aggregated = None | |
| if self.categorical_columns: | |
| cat = self.cat_transformer.transform( | |
| table[self.categorical_columns].values | |
| ) / np.array([len(x) for x in self.cat_transformer.categories_]).reshape((1, -1)) | |
| else: | |
| cat = np.zeros((table.shape[0], 0)) | |
| if self.numeric_columns: | |
| num = self.num_transformer.transform(table[self.numeric_columns].values) | |
| else: | |
| num = np.zeros((table.shape[0], 0)) | |
| transformed = np.concatenate([cat, num], axis=1) | |
| return transformed, groups, aggregated, agg_index | |
| def inverse_transform(self, transformed: np.ndarray, groups: Optional[Dict[Tuple, np.ndarray]] = None, | |
| aggregated: Optional[np.ndarray] = None, agg_index: Optional[pd.Index] = None | |
| ) -> pd.DataFrame: | |
| if self.categorical_columns: | |
| cat = transformed[:, :self.split_dim] | |
| cat = np.clip(cat, 0, np.array([x.shape[0] for x in self.cat_transformer.categories_]) - 1).round() | |
| cat = self.cat_transformer.inverse_transform(cat) | |
| cat = pd.DataFrame(cat, columns=self.categorical_columns) | |
| else: | |
| cat = pd.DataFrame(index=np.arange(transformed.shape[0]), columns=[]) | |
| if self.numeric_columns: | |
| num = self.num_transformer.inverse_transform(transformed[:, self.split_dim:]) | |
| num = pd.DataFrame(num, columns=self.numeric_columns) | |
| else: | |
| num = pd.DataFrame(index=np.arange(transformed.shape[0]), columns=[]) | |
| table = pd.concat([cat, num], axis=1) | |
| for c in self.config.id_columns: | |
| table[c] = np.arange(table.shape[0]) | |
| table = table[self.columns] | |
| if self.config.foreign_keys: | |
| groupby_columns = self.config.foreign_keys[0].child_column_names | |
| for vals, idx in groups.items(): | |
| table.loc[idx, groupby_columns] = pd.Series( | |
| {c: v for c, v in zip(groupby_columns, vals)} | |
| ).to_frame().T.loc[[0] * idx.shape[0]].set_axis(idx, axis=0) | |
| if self.config.sortby: | |
| aggregated = self.agg_transformer.inverse_transform(aggregated) | |
| aggregated = pd.DataFrame(aggregated, index=agg_index, columns=self.agg_columns) | |
| first_sortby = aggregated[f"{self.config.sortby}$first"] | |
| head = table.groupby(groupby_columns)[groupby_columns].head(1) | |
| agg_idx_to_table_idx = { | |
| tuple(row[groupby_columns]): i for i, row in head.iterrows() | |
| } | |
| first_sortby.index = [agg_idx_to_table_idx[x] for x in first_sortby.index] | |
| table.loc[head.index, self.config.sortby] = first_sortby | |
| table[self.config.sortby] = table.groupby(groupby_columns)[self.config.sortby].cumsum() | |
| return table | |
| def load(cls, path: str) -> Self: | |
| return load_from(path) | |
| def save(self, path: str): | |
| save_to(self, path) | |
| class RelationalTransformer: | |
| def __init__(self, | |
| tables: Dict[str, TableConfig], | |
| order: List[str], | |
| max_ctx_dim: int = 100): | |
| self.order = order | |
| self.transformers = {} | |
| self.children: Dict[str, List[ForeignKey]] = defaultdict(list) | |
| for tn in order: | |
| config = tables[tn] | |
| self.transformers[tn] = TableTransformer(config) | |
| for fk in config.foreign_keys: | |
| self.children[fk.parent_table_name].append(fk) | |
| self.max_ctx_dim = max_ctx_dim | |
| self._fitted_cache_dir = None | |
| self._sizes_of = {} | |
| self._nullable = {} | |
| self._parent_dims = {} | |
| self._core_dims = {} | |
| def fit(self, tables: Dict[str, str], cache_dir: str = "./cache", resource_path: str = "./cache/resource.csv"): | |
| self._fitted_cache_dir = cache_dir | |
| os.makedirs(cache_dir, exist_ok=True) | |
| for tn in self.order: | |
| table = pd.read_csv(tables[tn]) | |
| self._sizes_of[tn] = table.shape[0] | |
| table.to_csv(os.path.join(cache_dir, f"{tn}.csv"), index=False) | |
| with log_resource_usage(resource_path, f"fit table {tn} transformer"): | |
| transformer = self.transformers[tn] | |
| if len(set(table.columns)) != len(table.columns): | |
| raise ValueError(f"Same column name repeated in one table ({tn}).") | |
| transformer.fit(table) | |
| transformer.save(os.path.join(cache_dir, f"{tn}-transformer.pkl")) | |
| foreign_keys = self.transformers[tn].config.foreign_keys | |
| if foreign_keys: | |
| self._nullable[tn] = [] | |
| with log_resource_usage(resource_path, f"transform {tn} FK"): | |
| encoded, groups, aggregated, agg_index = transformer.transform(table) | |
| save_to({ | |
| "actual": (None, None, encoded, None) | |
| }, os.path.join(cache_dir, f"{tn}.pkl")) | |
| with log_resource_usage(resource_path, f"extend {tn}"): | |
| key, context, new_encoded = self._extend_till(tn, tn, table.columns.tolist(), cache_dir) | |
| float_cols = [ | |
| c for c in key.select_dtypes(include="float").columns | |
| if c not in self.transformers[tn].config.id_columns | |
| ] | |
| if np.abs(encoded - new_encoded[:, self._core_dims[tn]]).mean() > 1e-5 or not ( | |
| key.equals(table) or ((len(float_cols) == 0 or | |
| (key[float_cols] - table[float_cols]).abs().values.mean() <= 1e-5) | |
| and key.drop(columns=float_cols).equals(table.drop(columns=float_cols))) | |
| ): | |
| raise RuntimeError( | |
| f"Error when extending: {np.abs(encoded - new_encoded[:, self._core_dims[tn]]).mean()}, " | |
| f"{key.equals(table)}, {len(float_cols)}, " | |
| f"{(key[float_cols] - table[float_cols]).abs().values.mean()}, " | |
| f"{key.drop(columns=float_cols).equals(table.drop(columns=float_cols))}." | |
| ) | |
| agg_context = np.zeros((aggregated.shape[0], 0)) | |
| actual_context = np.zeros((aggregated.shape[0], 0)) | |
| transformed_context = np.zeros((encoded.shape[0], 0)) | |
| length = np.zeros(aggregated.shape[0]) | |
| all_fk_info = [] | |
| for fi, fk in enumerate(foreign_keys): | |
| fk_info = {} | |
| with log_resource_usage( | |
| resource_path, f"get degrees {tn}.({'|'.join(fk.child_column_names)})[{fi}]" | |
| ): | |
| parent_key, parent_context, parent_encoded = self._extend_till( | |
| fk.parent_table_name, tn, fk.parent_column_names, cache_dir, fitting=False, queue=[fk] | |
| ) | |
| degree_x = np.concatenate([parent_context, parent_encoded], axis=1) | |
| degree_y = table[fk.child_column_names].groupby(fk.child_column_names).size() | |
| if degree_y.index.nlevels <= 1: | |
| degree_y.index = pd.MultiIndex.from_arrays([degree_y.index], names=[degree_y.index.name]) | |
| if fi == 0: | |
| raw_degree = degree_y[agg_index] | |
| else: | |
| raw_degree = None | |
| parent_key_as_child = parent_key.rename(columns={ | |
| p: c for p, c in zip(fk.parent_column_names, fk.child_column_names) | |
| }) | |
| y_order = pd.MultiIndex.from_frame(parent_key_as_child) | |
| placeholder_degree_y = pd.Series(0, index=y_order) | |
| placeholder_degree_y.loc[degree_y.index] = degree_y | |
| degree_y = placeholder_degree_y.values | |
| if fi == 0: | |
| with log_resource_usage(resource_path, f"get context {tn}"): | |
| non_zero_degree_x = pd.DataFrame( | |
| degree_x, columns=[f"_dim{i:02d}" for i in range(degree_x.shape[-1])], | |
| index=parent_key.index | |
| ) | |
| non_zero_degree_x = pd.concat([parent_key_as_child, non_zero_degree_x], axis=1) | |
| agg_context = agg_index.to_frame().reset_index(drop=True) | |
| agg_context = agg_context.merge( | |
| non_zero_degree_x, how="left", on=agg_index.names | |
| ) | |
| agg_context = agg_context.set_index(agg_index.names) | |
| if agg_context.index.nlevels <= 1: | |
| agg_context.index = pd.MultiIndex.from_arrays( | |
| [agg_context.index], names=agg_index.names | |
| ) | |
| length = raw_degree.values | |
| agg_context = agg_context.loc[agg_index].values | |
| actual_context = np.concatenate([agg_context, aggregated], axis=1) | |
| actual_context = pd.DataFrame(actual_context, index=agg_index) | |
| transformed_context = np.empty((encoded.shape[0], actual_context.shape[-1])) | |
| for g, idx in groups.items(): | |
| transformed_context[idx] = actual_context.loc[g] | |
| actual_context = actual_context.values | |
| fk_info["degree"] = degree_x, degree_y | |
| if table[fk.child_column_names].isna().any().any(): | |
| with log_resource_usage( | |
| resource_path, f"get isna {tn}.({'|'.join(fk.child_column_names)})[{fi}]" | |
| ): | |
| isna_y = table[fk.child_column_names].isna().any(axis=1) | |
| fk_info["isna"] = np.concatenate([transformed_context, new_encoded], axis=1), isna_y.values | |
| self._nullable[tn].append(True) | |
| else: | |
| self._nullable[tn].append(False) | |
| all_fk_info.append(fk_info) | |
| out = { | |
| "aggregated": (agg_context, aggregated), | |
| "actual": ( | |
| actual_context, length, new_encoded, | |
| [groups[tuple(x) if isinstance(x, tuple) else (x,)] for x in agg_index] | |
| ), | |
| "foreign_keys": all_fk_info, | |
| } | |
| else: | |
| encoded, _, _, _ = transformer.transform(table) | |
| out = { | |
| "encoded": encoded | |
| } | |
| save_to(out, os.path.join(cache_dir, f"{tn}.pkl")) | |
| def _extend_till(self, table: str, till: str, keys: Sequence[str], cache_dir: str, | |
| fitting: bool = True, queue: List[ForeignKey] = []) -> Tuple[pd.DataFrame, np.ndarray, np.ndarray]: | |
| allowed_tables = self.order[:self.order.index(till)] | |
| raw = pd.read_csv(os.path.join(cache_dir, f"{table}.csv")) | |
| if self.transformers[table].config.foreign_keys: | |
| _, _, encoded, _ = self.actual_generation_for(table, cache_dir) | |
| else: | |
| encoded = self.standalone_encoded_for(table, cache_dir) | |
| core_columns = [f"_dim{i:02d}" for i in range(encoded.shape[-1])] | |
| core = pd.DataFrame(encoded, columns=core_columns, index=raw.index) | |
| core = pd.concat([raw.index.to_frame(False, "_id"), raw, core], axis=1) | |
| for fi, fk in enumerate(self.transformers[table].config.foreign_keys): | |
| if fk in queue: | |
| continue | |
| parent_raw, parent_context, parent_encoded = self._extend_till( | |
| fk.parent_table_name, till, fk.parent_column_names, cache_dir, fitting, queue + [fk] | |
| ) | |
| parent_encoded = np.concatenate([parent_context, parent_encoded], axis=1) | |
| if table == till: | |
| parent_encoded = self._reduce_dims( | |
| parent_encoded, fk.parent_table_name, fitting, queue + [fk], cache_dir, allowed_tables | |
| ) | |
| parent_encoded = pd.DataFrame( | |
| parent_encoded, columns=[f"_dim{i:02d}_p{fi}" for i in range(parent_encoded.shape[-1])], | |
| index=np.arange(parent_encoded.shape[0]) | |
| ) | |
| parent_idx_df = parent_raw[fk.parent_column_names].rename(columns={ | |
| p: c for p, c in zip(fk.parent_column_names, fk.child_column_names) | |
| }) | |
| parent_encoded = pd.concat([parent_idx_df, parent_encoded], axis=1) | |
| core = core.merge(parent_encoded, on=fk.child_column_names, how="left").fillna(-1) | |
| for fi, fk in enumerate(self.children[table]): | |
| if fk.child_table_name not in allowed_tables or fk in queue: | |
| continue | |
| sibling_raw, sibling_context, sibling_encoded = self._extend_till( | |
| fk.child_table_name, till, fk.child_column_names, cache_dir, fitting, queue + [fk] | |
| ) | |
| sibling_encoded = np.concatenate([sibling_context, sibling_encoded], axis=1) | |
| sibling_encoded = self._reduce_dims( | |
| sibling_encoded, fk.child_table_name, fitting, queue + [fk], cache_dir, allowed_tables | |
| ) | |
| encoded_columns = [f"_dim{i:02d}_c{fi}" for i in range(sibling_encoded.shape[-1])] | |
| sibling_encoded = pd.DataFrame( | |
| sibling_encoded, columns=encoded_columns, index=np.arange(sibling_encoded.shape[0]) | |
| ) | |
| sibling_idx_df = sibling_raw[fk.child_column_names].rename(columns={ | |
| c: p for c, p in zip(fk.child_column_names, fk.parent_column_names) | |
| }) | |
| sibling_encoded = pd.concat([sibling_idx_df, sibling_encoded], axis=1) | |
| sibling_encoded_aggregated = sibling_encoded.groupby(fk.parent_column_names).aggregate(["mean", "std"]) | |
| sibling_encoded_aggregated = sibling_encoded_aggregated.reset_index() | |
| sibling_encoded_aggregated.columns = pd.Index([ | |
| f"{a}${b}" if b else a for a, b in sibling_encoded_aggregated.columns | |
| ]) | |
| core = core.merge( | |
| sibling_encoded_aggregated, on=fk.parent_column_names, how="left" | |
| ).fillna(0) | |
| core = core.set_index("_id").loc[raw.index] | |
| raw_keys = raw[keys] | |
| context_columns = [c for c in core.columns if c.startswith("_dim") and c.endswith("_p0")] | |
| context = core[context_columns] | |
| encoded = core.drop(columns=context_columns + raw.columns.tolist()) | |
| if fitting and table == till: | |
| parent_dims = [] | |
| name_to_id = { | |
| c: i for i, c in enumerate(encoded.columns) | |
| } | |
| for fi in range(0, len(self.transformers[table].config.foreign_keys)): | |
| parent_dims.append([ | |
| name_to_id[n] for n in encoded.columns if n.endswith(f"_p{fi}") and n.startswith("_dim") | |
| ]) | |
| self._parent_dims[table] = parent_dims | |
| self._core_dims[table] = [name_to_id[n] for n in core_columns] | |
| if raw_keys.shape[0] != encoded.values.shape[0]: | |
| raise RuntimeError(f"Extended table shape changed: {raw_keys.shape, raw.shape, encoded.shape}") # TODO: remove | |
| return raw_keys, context.values, encoded.values | |
| def _reduce_dims(self, parent_encoded: np.ndarray, table: str, fitting: bool, queue: List[ForeignKey], | |
| cache_dir: str, allowed_tables: List[str]) -> np.ndarray: | |
| if parent_encoded.shape[-1] > self.max_ctx_dim: | |
| queue_str = json.dumps([ | |
| f"parent={qfk.parent_table_name}, child={qfk.child_table_name}, " | |
| f"columns={qfk.child_column_names}" for qfk in queue | |
| ]) | |
| pca_name = f"{table}_{len(allowed_tables)}_{hashlib.sha1(queue_str.encode()).hexdigest()}" | |
| os.makedirs(os.path.join(cache_dir, "pca"), exist_ok=True) | |
| pca_path = os.path.join(cache_dir, "pca", f"{pca_name}.pkl") | |
| if fitting: | |
| if os.path.exists(pca_path): | |
| raise FileExistsError(f"File for PCA already exists: {table} {allowed_tables[-1]} {queue}.") | |
| pca = PCA(n_components=self.max_ctx_dim) | |
| parent_encoded = pca.fit_transform(parent_encoded) | |
| save_to(pca, pca_path) | |
| else: | |
| pca = load_from(pca_path) | |
| parent_encoded = pca.transform(parent_encoded) | |
| return parent_encoded | |
| def fitted_size_of(self, table_name: str) -> int: | |
| return self._sizes_of[table_name] | |
| def standalone_encoded_for(cls, table_name: str, cache_dir: str = "./cache") -> np.ndarray: | |
| return load_from(os.path.join(cache_dir, f"{table_name}.pkl"))["encoded"] | |
| def degree_prediction_for(cls, table_name: str, fk_idx: int, cache_dir: str = "./cache") -> Tuple[ | |
| np.ndarray, Optional[np.ndarray] | |
| ]: | |
| return load_from(os.path.join(cache_dir, f"{table_name}.pkl"))["foreign_keys"][fk_idx]["degree"] | |
| def isna_indicator_prediction_for(cls, table_name: str, fk_idx: int, cache_dir: str = "./cache") -> Optional[Tuple[ | |
| np.ndarray, Optional[np.ndarray] | |
| ]]: | |
| return load_from(os.path.join(cache_dir, f"{table_name}.pkl"))["foreign_keys"][fk_idx].get("isna") | |
| def aggregated_generation_for(cls, table_name: str, cache_dir: str = "./cache") -> Tuple[ | |
| np.ndarray, Optional[np.ndarray] | |
| ]: | |
| return load_from(os.path.join(cache_dir, f"{table_name}.pkl"))["aggregated"] | |
| def actual_generation_for(cls, table_name: str, cache_dir: str = "./cache") -> Tuple[ | |
| np.ndarray, np.ndarray, Optional[np.ndarray], Optional[List[np.ndarray]] | |
| ]: | |
| return load_from(os.path.join(cache_dir, f"{table_name}.pkl"))["actual"] | |
| def fk_matching_for(self, table_name: str, fk_idx: int, sampled_dir: str = "./cache") -> Tuple[ | |
| np.ndarray, np.ndarray, np.ndarray, np.ndarray, List[Optional[np.ndarray]], List[np.ndarray] | |
| ]: | |
| loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| _, _, values, groups = loaded["actual"] | |
| values = values[:, self._parent_dims[table_name][fk_idx]] | |
| parent, degrees = loaded["foreign_keys"][fk_idx]["degree"] | |
| fk = self.transformers[table_name].config.foreign_keys[fk_idx] | |
| parent = self._reduce_dims( | |
| parent, fk.parent_table_name, | |
| False, [fk], self._fitted_cache_dir, self.order[:self.order.index(table_name)] | |
| ) | |
| if values.shape[-1] != parent.shape[-1]: | |
| raise RuntimeError(f"The sizes to be matched are different: {values.shape}, {parent.shape}.") | |
| isnull = loaded["foreign_keys"][fk_idx]["isna"] | |
| if isnull is None: | |
| return_isna = np.zeros(values.shape[0], dtype=np.bool_) | |
| else: | |
| _, return_isna = isnull | |
| # collect prev FK values | |
| key_df = pd.DataFrame(index=pd.RangeIndex(values.shape[0])) | |
| for i, fk in enumerate(self.transformers[table_name].config.foreign_keys[:fk_idx]): | |
| parent_match = loaded["foreign_keys"][i]["match"] | |
| existing_vals = pd.read_csv( | |
| os.path.join(sampled_dir, f"{fk.parent_table_name}.csv") | |
| ).rename( | |
| columns={p: c for p, c in zip(fk.parent_column_names, fk.child_column_names)} | |
| )[fk.child_column_names] | |
| isna = np.isnan(parent_match.astype(np.float32)) | |
| if isna.any(): | |
| dummy_idx = existing_vals.shape[0] | |
| existing_vals.loc[dummy_idx, existing_vals.columns] = np.nan | |
| existing_vals = existing_vals.iloc[ | |
| np.where(isna, dummy_idx, parent_match) | |
| ].reset_index(drop=True) | |
| else: | |
| existing_vals = existing_vals.iloc[parent_match].reset_index(drop=True) | |
| if set(key_df.columns) & set(existing_vals.columns): | |
| same_cols = [*set(key_df.columns) & set(existing_vals.columns)] | |
| if key_df[same_cols][~return_isna].equals( | |
| existing_vals[same_cols][~return_isna].astype(key_df[same_cols].dtypes) | |
| ): | |
| new_cols = [*set(existing_vals.columns) - set(key_df.columns)] | |
| key_df[new_cols] = existing_vals[new_cols] | |
| else: | |
| raise RuntimeError(f"Overlapping FKs in previous FKs invalid ({table_name})[{fk_idx}].") | |
| else: | |
| key_df[fk.child_column_names] = existing_vals | |
| pools = [None] * values.shape[0] | |
| # overlapping FKs result in limited pools | |
| curr_fk = self.transformers[table_name].config.foreign_keys[fk_idx] | |
| prev_fk_cols = set() | |
| this_parent_raw = pd.read_csv(os.path.join(sampled_dir, f"{curr_fk.parent_table_name}.csv")).rename( | |
| columns={p: c for p, c in zip(curr_fk.parent_column_names, curr_fk.child_column_names)} | |
| ) | |
| for i, fk in enumerate(self.transformers[table_name].config.foreign_keys[:fk_idx]): | |
| set1_cols = set(curr_fk.child_column_names) & set(fk.child_column_names) | |
| if set1_cols: | |
| set1_cols = [*set1_cols] | |
| existing_vals = key_df[set1_cols] | |
| this_parent_to_overlap_grouped = this_parent_raw[set1_cols].groupby(set1_cols) | |
| for ov, rows in existing_vals.groupby(set1_cols): | |
| try: | |
| this_parent_rows = this_parent_to_overlap_grouped.get_group(ov) | |
| allowed_choices = this_parent_rows.index.values | |
| for r in rows.index: | |
| if pools[r] is None: | |
| pools[r] = allowed_choices | |
| else: | |
| pools[r] = np.intersect1d(pools[r], allowed_choices) | |
| except KeyError: | |
| pass | |
| prev_fk_cols |= set(fk.child_column_names) | |
| curr_fk_cols = set(curr_fk.child_column_names) | |
| all_fk_cols = prev_fk_cols | curr_fk_cols | |
| # inequality results in limited pools | |
| for (a, b) in self.transformers[table_name].config.inequality: | |
| this_ineq_cols = set(a) | set(b) | |
| if this_ineq_cols <= all_fk_cols and not this_ineq_cols <= prev_fk_cols: | |
| for i, fk in enumerate(self.transformers[table_name].config.foreign_keys[:fk_idx]): | |
| set1_cols = set(fk.child_column_names) & this_ineq_cols | |
| set2_cols = this_ineq_cols - prev_fk_cols | |
| if set1_cols: | |
| if set1_cols & set(a): | |
| set1_cols = [x for x in a if x in set1_cols] | |
| set2_cols = [x for x in b if x in set2_cols] | |
| else: | |
| set1_cols = [x for x in b if x in set1_cols] | |
| set2_cols = [x for x in a if x in set2_cols] | |
| existing_vals = key_df[set1_cols] | |
| this_parent_to_overlap_grouped = this_parent_raw[set2_cols].groupby(set2_cols) | |
| for ov, rows in existing_vals.groupby(set1_cols): | |
| try: | |
| this_parent_rows = this_parent_to_overlap_grouped.get_group(ov) | |
| disallowed_choices = this_parent_rows.index.values | |
| for r in rows.index: | |
| if pools[r] is None: | |
| pools[r] = np.setdiff1d(np.arange(this_parent_raw.shape[0]), disallowed_choices) | |
| else: | |
| pools[r] = np.setdiff1d(pools[r], disallowed_choices) | |
| except KeyError: | |
| pass | |
| # uniqueness constraints of uniqueness groups | |
| uniqueness_groups = [] | |
| if self.transformers[table_name].config.primary_key: | |
| pk_cols = set(self.transformers[table_name].config.primary_key) | |
| if pk_cols <= (curr_fk_cols | prev_fk_cols) and not pk_cols <= prev_fk_cols: | |
| core_cols = [*pk_cols & prev_fk_cols] | |
| for g, d in key_df.groupby(core_cols): | |
| uniqueness_groups.append(d.index.values) | |
| return values, parent, degrees, return_isna, pools, uniqueness_groups | |
| def prepare_sampled_dir(self, sampled_dir: str): | |
| if os.path.exists(sampled_dir): | |
| shutil.rmtree(sampled_dir) | |
| os.makedirs(sampled_dir, exist_ok=True) | |
| if os.path.exists(os.path.join(self._fitted_cache_dir, "pca")): | |
| shutil.copytree(os.path.join(self._fitted_cache_dir, "pca"), os.path.join(sampled_dir, "pca")) | |
| def save_standalone_encoded_for(cls, table_name: str, encoded: np.ndarray, sampled_dir: str = "./sampled"): | |
| save_to({"encoded": encoded}, os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| def save_degree_for(cls, table_name: str, fk_idx: int, degree: np.ndarray, sampled_dir: str = "./sampled"): | |
| loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| x, _ = loaded["foreign_keys"][fk_idx]["degree"] | |
| loaded["foreign_keys"][fk_idx]["degree"] = x, degree | |
| if fk_idx == 0: | |
| a, b, c, d = loaded.get("actual", (None, None, None, None)) | |
| non_zero_deg = degree > 0 | |
| loaded["actual"] = a, degree[non_zero_deg], c, d | |
| non_zero_x = x[non_zero_deg] | |
| loaded["aggregated"] = non_zero_x, None | |
| save_to(loaded, os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| def save_isna_indicator_for(self, table_name: str, fk_idx: int, isna: np.ndarray, sampled_dir: str = "./sampled"): | |
| loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| x, _ = loaded["foreign_keys"][fk_idx]["isna"] | |
| loaded["foreign_keys"][fk_idx]["isna"] = x, isna | |
| a, b, encoded, d = loaded["actual"] | |
| encoded[np.ix_(isna, self._parent_dims[table_name][fk_idx])] = 0 | |
| loaded["actual"] = a, b, encoded, d | |
| save_to(loaded, os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| def save_aggregated_info_for(cls, table_name: str, aggregated: np.ndarray, sampled_dir: str = "./sampled"): | |
| loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| agg_context, _ = loaded["aggregated"] | |
| loaded["aggregated"] = agg_context, aggregated | |
| actual_context = np.concatenate([agg_context, aggregated], axis=1) | |
| _, length, _, _ = loaded["actual"] | |
| loaded["actual"] = actual_context, length, None, None | |
| save_to(loaded, os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| def save_actual_values_for( | |
| cls, table_name: str, values: np.ndarray, groups: List[np.ndarray], sampled_dir: str = "./sampled" | |
| ): | |
| loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| context, length, _, _ = loaded["actual"] | |
| length = np.array([len(x) for x in groups]) | |
| loaded["actual"] = context, length, values, groups | |
| for i, fk in enumerate(loaded["foreign_keys"]): | |
| isnull = fk["isna"] | |
| if isnull is not None: | |
| cids = np.repeat(np.arange(context.shape[0]), length.astype(int)) | |
| loaded["foreign_keys"][i]["isna"] = np.concatenate([context[cids], values], axis=1), None | |
| break | |
| save_to(loaded, os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| def save_matched_indices_for(self, table_name: str, fk_idx: int, | |
| indices: np.ndarray, sampled_dir: str = "./sampled"): | |
| loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| loaded["foreign_keys"][fk_idx]["match"] = indices | |
| context, length, encoded, d = loaded["actual"] | |
| parent, _ = loaded["foreign_keys"][fk_idx]["degree"] | |
| isna = np.isnan(indices.astype(np.float32)) | |
| if self._parent_dims[table_name][fk_idx]: | |
| fk = self.transformers[table_name].config.foreign_keys[fk_idx] | |
| encoded[np.ix_(np.nonzero(~isna)[0], self._parent_dims[table_name][fk_idx])] = self._reduce_dims( | |
| parent[indices[~isna].astype(np.int32)], fk.parent_table_name, False, [fk], | |
| self._fitted_cache_dir, self.order[:self.order.index(table_name)] | |
| ) | |
| loaded["actual"] = context, length, encoded, d | |
| for i, fk in enumerate(loaded["foreign_keys"]): | |
| if i <= fk_idx: | |
| continue | |
| isnull = fk["isna"] | |
| if isnull is not None: | |
| cids = np.repeat(np.arange(context.shape[0]), length.astype(int)) | |
| loaded["foreign_keys"][i]["isna"] = np.concatenate([context[cids], encoded], axis=1), None | |
| break | |
| save_to(loaded, os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| def copy_fitted_for(self, table_name: str, sampled_dir: str = "./sampled"): | |
| shutil.copyfile(os.path.join(self._fitted_cache_dir, f"{table_name}.pkl"), | |
| os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| shutil.copyfile(os.path.join(self._fitted_cache_dir, f"{table_name}.csv"), | |
| os.path.join(sampled_dir, f"{table_name}.csv")) | |
| def prepare_next_for(self, table_name: str, sampled_dir: str = "./cache"): | |
| if self.transformers[table_name].config.foreign_keys: | |
| _, aggregated = self.aggregated_generation_for(table_name, sampled_dir) | |
| _, _, encoded, indices = self.actual_generation_for(table_name, sampled_dir) | |
| _, deg = self.degree_prediction_for(table_name, 0, sampled_dir) | |
| foreign_keys = self.transformers[table_name].config.foreign_keys | |
| fk = foreign_keys[0] | |
| parent = pd.read_csv(os.path.join(sampled_dir, f"{fk.parent_table_name}.csv")) | |
| parent_idx = pd.MultiIndex.from_frame(parent[fk.parent_column_names].rename({ | |
| p: c for p, c in zip(fk.parent_column_names, fk.child_column_names) | |
| }))[deg > 0] | |
| groups = { | |
| pi: idx for pi, idx in zip(parent_idx, indices) | |
| } | |
| recovered = self.transformers[table_name].inverse_transform( | |
| encoded[:, self._core_dims[table_name]], groups, aggregated, parent_idx | |
| ) | |
| occurred_cols = set() | |
| if len(foreign_keys) > 1: | |
| loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl")) | |
| else: | |
| loaded = None | |
| for i, fk in enumerate(foreign_keys): | |
| if i == 0: | |
| occurred_cols |= set(fk.child_column_names) | |
| continue | |
| new_cols = [c for c in fk.child_column_names if c not in occurred_cols] | |
| match_indices = loaded["foreign_keys"][i]["match"] | |
| parent_table = pd.read_csv(os.path.join(sampled_dir, f"{fk.parent_table_name}.csv")) | |
| dummy_index = parent_table.shape[0] | |
| parent_table.loc[dummy_index, parent_table.columns] = np.nan | |
| recovered.loc[:, new_cols] = parent_table.iloc[np.where( | |
| np.isnan(match_indices.astype(np.float32)), dummy_index, match_indices | |
| )].rename( | |
| columns={p: c for p, c in zip(fk.parent_column_names, fk.child_column_names)} | |
| )[new_cols].set_axis(recovered.index, axis=0) | |
| occurred_cols |= set(fk.child_column_names) | |
| else: | |
| encoded = self.standalone_encoded_for(table_name, sampled_dir) | |
| recovered = self.transformers[table_name].inverse_transform(encoded) | |
| recovered.to_csv(os.path.join(sampled_dir, f"{table_name}.csv"), index=False) | |
| table_idx = self.order.index(table_name) | |
| if table_idx >= len(self.order) - 1: | |
| return | |
| next_table_name = self.order[table_idx + 1] | |
| degrees = [] | |
| for i, fk in enumerate(self.transformers[next_table_name].config.foreign_keys): | |
| parent_raw, parent_context, parent_encoded = self._extend_till( | |
| fk.parent_table_name, next_table_name, fk.parent_column_names, sampled_dir, False, [fk] | |
| ) | |
| parent_extend_till = np.concatenate([parent_context, parent_encoded], axis=1) | |
| degrees.append(parent_extend_till) | |
| save_to({ | |
| "foreign_keys": [{ | |
| "degree": (x, None), "isna": (None, None) if y else None | |
| } for x, y in zip(degrees, self._nullable.get(next_table_name, []))] | |
| }, os.path.join(sampled_dir, f"{next_table_name}.pkl")) | |