IRG / irg /schema.py
Zilong-Zhao's picture
first commit
c4ac745
import hashlib
import json
import os
import shutil
from collections import defaultdict
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
from typing_extensions import Self
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.preprocessing import OrdinalEncoder, StandardScaler
from .utils import load_from, log_resource_usage, save_to
class ForeignKey:
def __init__(self,
child_table_name: str,
parent_table_name: str,
child_column_names: Union[str, Sequence[str]],
parent_column_names: Optional[Union[str, Sequence[str]]] = None,
unique: bool = False,
total_participate: bool = False):
self.child_table_name = child_table_name
self.parent_table_name = parent_table_name
self.child_column_names = child_column_names if not isinstance(child_column_names, str) else [
child_column_names]
if parent_column_names is None:
parent_column_names = self.child_column_names
self.parent_column_names = parent_column_names if \
not isinstance(parent_column_names, str) else [parent_column_names]
self.unique = unique
self.total_participate = total_participate
def __eq__(self, other: Any) -> bool:
if not isinstance(other, ForeignKey):
return False
for k in ["child_table_name", "parent_table_name", "child_column_names"]:
if getattr(self, k) != getattr(other, k):
return False
return True
class TableConfig:
def __init__(self,
name: str,
primary_key: Optional[Union[str, Sequence[str]]] = None,
foreign_keys: Optional[Sequence[ForeignKey]] = None,
sortby: Optional[str] = None,
id_columns: Optional[Sequence[str]] = None,
inequality: Optional[Union[Tuple[str, str], Tuple[Tuple[str, ...], Tuple[str, ...]]]] = None):
self.name = name
self.primary_key = primary_key if not isinstance(primary_key, str) else [primary_key]
self.foreign_keys = foreign_keys if foreign_keys is not None else []
self.sortby = sortby
self.id_columns = id_columns if id_columns is not None else []
self.inequality = [
([a], [b]) if isinstance(a, str) else (a, b) for a, b in inequality
] if inequality is not None else []
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Self:
foreign_keys = data.get("foreign_keys", [])
foreign_keys = [ForeignKey(**x, child_table_name=data["name"]) for x in foreign_keys]
data = data.copy()
data["foreign_keys"] = foreign_keys
return cls(**data)
class TableTransformer:
def __init__(self, config: TableConfig):
self.config = config
self.columns = []
self.categorical_columns = []
self.numeric_columns = []
self.agg_columns = None
self.top_cat_values = {}
self.agg_transformer = StandardScaler() if self.config.foreign_keys else None
self.cat_transformer = OrdinalEncoder()
self.num_transformer = StandardScaler()
self.count_null = []
self.split_dim = 0
def fit(self, table: pd.DataFrame):
for c in self.config.id_columns:
if table[c].isna().any():
self.count_null.append(c)
self.columns = table.columns
numeric_columns = table.select_dtypes(include=np.number).columns
categorical_columns = table.drop(columns=numeric_columns.tolist()).columns
self.categorical_columns = [
c for c in categorical_columns if c not in self.config.id_columns
]
self.numeric_columns = [
c for c in numeric_columns if c not in self.config.id_columns
]
for c in self.categorical_columns:
self.top_cat_values[c] = table[c].value_counts().iloc[:3].values.tolist()
if self.config.foreign_keys:
aggregated, table = self.aggregate(table)
aggregated = aggregated.bfill().ffill()
self.agg_columns = aggregated.columns
if aggregated.shape[-1] > 0:
self.agg_transformer.fit(aggregated.values)
table = table.bfill().ffill()
if self.categorical_columns:
cat = self.cat_transformer.fit_transform(table[self.categorical_columns].values)
self.split_dim = cat.shape[1]
else:
self.split_dim = 0
if self.numeric_columns:
self.num_transformer.fit(table[self.numeric_columns].values)
def aggregate(self, table: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
if not self.config.foreign_keys:
raise RuntimeError(f"Table {self.config.name} has no FK, so aggregate is not a valid operation.")
groupby_columns = self.config.foreign_keys[0].child_column_names
groupby = table.groupby(groupby_columns)
if self.config.sortby:
first_sortby: pd.Series = groupby[self.config.sortby].head(1)
first_sortby.index = pd.MultiIndex.from_frame(
table.loc[first_sortby.index, groupby_columns]
)
sorby_diff: pd.Series = groupby[self.config.sortby].diff()
sorby_diff = sorby_diff.fillna(sorby_diff.mean())
table = pd.concat([
table.drop(columns=[self.config.sortby]),
sorby_diff.to_frame(self.config.sortby)
], axis=1)[table.columns]
groupby = table.groupby(groupby_columns)
out = self._aggregate_values(groupby)
out = pd.concat([
out, pd.concat({self.config.sortby: first_sortby.to_frame("first")}, axis=1)
], axis=1)
else:
out = self._aggregate_values(groupby)
out.columns = pd.Index([f"{a}${b}" for a, b in out.columns])
return out, table
def _aggregate_values(self, groupby):
if self.numeric_columns:
num_groupby = groupby[self.numeric_columns]
out = num_groupby.aggregate(["mean", "median", "std"]).fillna(0)
else:
out = pd.concat({"": groupby.size().to_frame()[[]]}, axis=1)
if len(self.categorical_columns) > 0:
cat_groupby = groupby[self.categorical_columns]
out = pd.concat([out, self._aggregate_categorical(cat_groupby)], axis=1)
if len(self.count_null) > 0:
null_groupby = groupby[self.count_null].aggregate(lambda group: group.isna().mean())
null_groupby = pd.concat({"null-ratio": null_groupby}, axis=1).swaplevel(0, 1, axis=1)
out = pd.concat([out, null_groupby], axis=1)
if out.index.nlevels <= 1:
out.index = pd.MultiIndex.from_arrays([out.index], names=[out.index.name])
return out
def _aggregate_categorical(self, grouped: pd.core.groupby.generic.DataFrameGroupBy) -> pd.DataFrame:
df = grouped.obj
group_keys = grouped.grouper.names
results = {}
sizes = grouped.size()
for col, values in self.top_cat_values.items():
ctab = pd.crosstab(index=[df[k] for k in group_keys], columns=df[col])
ctab = ctab.reindex(columns=values, fill_value=0)
ctab_ratio = ctab / sizes.loc[ctab.index].values.reshape((-1, 1))
results[col] = ctab_ratio
final = pd.concat(results, axis=1)
return final
def transform(self, table: pd.DataFrame) -> Tuple[
np.ndarray, Optional[Dict[Tuple, np.ndarray]], Optional[np.ndarray], Optional[pd.Index]
]:
table = table.reset_index(drop=True)
if self.config.foreign_keys:
groups = table.groupby(self.config.foreign_keys[0].child_column_names).groups
groups = {
k: v.values for k, v in groups.items()
}
aggregated, table = self.aggregate(table)
if aggregated.index.nlevels <= 1:
groups = {(k,): v for k, v in groups.items()}
agg_index = aggregated.index
if aggregated.shape[-1] > 0:
aggregated = self.agg_transformer.transform(aggregated.values)
else:
aggregated = aggregated.values
else:
groups = None
agg_index = None
aggregated = None
if self.categorical_columns:
cat = self.cat_transformer.transform(
table[self.categorical_columns].values
) / np.array([len(x) for x in self.cat_transformer.categories_]).reshape((1, -1))
else:
cat = np.zeros((table.shape[0], 0))
if self.numeric_columns:
num = self.num_transformer.transform(table[self.numeric_columns].values)
else:
num = np.zeros((table.shape[0], 0))
transformed = np.concatenate([cat, num], axis=1)
return transformed, groups, aggregated, agg_index
def inverse_transform(self, transformed: np.ndarray, groups: Optional[Dict[Tuple, np.ndarray]] = None,
aggregated: Optional[np.ndarray] = None, agg_index: Optional[pd.Index] = None
) -> pd.DataFrame:
if self.categorical_columns:
cat = transformed[:, :self.split_dim]
cat = np.clip(cat, 0, np.array([x.shape[0] for x in self.cat_transformer.categories_]) - 1).round()
cat = self.cat_transformer.inverse_transform(cat)
cat = pd.DataFrame(cat, columns=self.categorical_columns)
else:
cat = pd.DataFrame(index=np.arange(transformed.shape[0]), columns=[])
if self.numeric_columns:
num = self.num_transformer.inverse_transform(transformed[:, self.split_dim:])
num = pd.DataFrame(num, columns=self.numeric_columns)
else:
num = pd.DataFrame(index=np.arange(transformed.shape[0]), columns=[])
table = pd.concat([cat, num], axis=1)
for c in self.config.id_columns:
table[c] = np.arange(table.shape[0])
table = table[self.columns]
if self.config.foreign_keys:
groupby_columns = self.config.foreign_keys[0].child_column_names
for vals, idx in groups.items():
table.loc[idx, groupby_columns] = pd.Series(
{c: v for c, v in zip(groupby_columns, vals)}
).to_frame().T.loc[[0] * idx.shape[0]].set_axis(idx, axis=0)
if self.config.sortby:
aggregated = self.agg_transformer.inverse_transform(aggregated)
aggregated = pd.DataFrame(aggregated, index=agg_index, columns=self.agg_columns)
first_sortby = aggregated[f"{self.config.sortby}$first"]
head = table.groupby(groupby_columns)[groupby_columns].head(1)
agg_idx_to_table_idx = {
tuple(row[groupby_columns]): i for i, row in head.iterrows()
}
first_sortby.index = [agg_idx_to_table_idx[x] for x in first_sortby.index]
table.loc[head.index, self.config.sortby] = first_sortby
table[self.config.sortby] = table.groupby(groupby_columns)[self.config.sortby].cumsum()
return table
@classmethod
def load(cls, path: str) -> Self:
return load_from(path)
def save(self, path: str):
save_to(self, path)
class RelationalTransformer:
def __init__(self,
tables: Dict[str, TableConfig],
order: List[str],
max_ctx_dim: int = 100):
self.order = order
self.transformers = {}
self.children: Dict[str, List[ForeignKey]] = defaultdict(list)
for tn in order:
config = tables[tn]
self.transformers[tn] = TableTransformer(config)
for fk in config.foreign_keys:
self.children[fk.parent_table_name].append(fk)
self.max_ctx_dim = max_ctx_dim
self._fitted_cache_dir = None
self._sizes_of = {}
self._nullable = {}
self._parent_dims = {}
self._core_dims = {}
def fit(self, tables: Dict[str, str], cache_dir: str = "./cache", resource_path: str = "./cache/resource.csv"):
self._fitted_cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
for tn in self.order:
table = pd.read_csv(tables[tn])
self._sizes_of[tn] = table.shape[0]
table.to_csv(os.path.join(cache_dir, f"{tn}.csv"), index=False)
with log_resource_usage(resource_path, f"fit table {tn} transformer"):
transformer = self.transformers[tn]
if len(set(table.columns)) != len(table.columns):
raise ValueError(f"Same column name repeated in one table ({tn}).")
transformer.fit(table)
transformer.save(os.path.join(cache_dir, f"{tn}-transformer.pkl"))
foreign_keys = self.transformers[tn].config.foreign_keys
if foreign_keys:
self._nullable[tn] = []
with log_resource_usage(resource_path, f"transform {tn} FK"):
encoded, groups, aggregated, agg_index = transformer.transform(table)
save_to({
"actual": (None, None, encoded, None)
}, os.path.join(cache_dir, f"{tn}.pkl"))
with log_resource_usage(resource_path, f"extend {tn}"):
key, context, new_encoded = self._extend_till(tn, tn, table.columns.tolist(), cache_dir)
float_cols = [
c for c in key.select_dtypes(include="float").columns
if c not in self.transformers[tn].config.id_columns
]
if np.abs(encoded - new_encoded[:, self._core_dims[tn]]).mean() > 1e-5 or not (
key.equals(table) or ((len(float_cols) == 0 or
(key[float_cols] - table[float_cols]).abs().values.mean() <= 1e-5)
and key.drop(columns=float_cols).equals(table.drop(columns=float_cols)))
):
raise RuntimeError(
f"Error when extending: {np.abs(encoded - new_encoded[:, self._core_dims[tn]]).mean()}, "
f"{key.equals(table)}, {len(float_cols)}, "
f"{(key[float_cols] - table[float_cols]).abs().values.mean()}, "
f"{key.drop(columns=float_cols).equals(table.drop(columns=float_cols))}."
)
agg_context = np.zeros((aggregated.shape[0], 0))
actual_context = np.zeros((aggregated.shape[0], 0))
transformed_context = np.zeros((encoded.shape[0], 0))
length = np.zeros(aggregated.shape[0])
all_fk_info = []
for fi, fk in enumerate(foreign_keys):
fk_info = {}
with log_resource_usage(
resource_path, f"get degrees {tn}.({'|'.join(fk.child_column_names)})[{fi}]"
):
parent_key, parent_context, parent_encoded = self._extend_till(
fk.parent_table_name, tn, fk.parent_column_names, cache_dir, fitting=False, queue=[fk]
)
degree_x = np.concatenate([parent_context, parent_encoded], axis=1)
degree_y = table[fk.child_column_names].groupby(fk.child_column_names).size()
if degree_y.index.nlevels <= 1:
degree_y.index = pd.MultiIndex.from_arrays([degree_y.index], names=[degree_y.index.name])
if fi == 0:
raw_degree = degree_y[agg_index]
else:
raw_degree = None
parent_key_as_child = parent_key.rename(columns={
p: c for p, c in zip(fk.parent_column_names, fk.child_column_names)
})
y_order = pd.MultiIndex.from_frame(parent_key_as_child)
placeholder_degree_y = pd.Series(0, index=y_order)
placeholder_degree_y.loc[degree_y.index] = degree_y
degree_y = placeholder_degree_y.values
if fi == 0:
with log_resource_usage(resource_path, f"get context {tn}"):
non_zero_degree_x = pd.DataFrame(
degree_x, columns=[f"_dim{i:02d}" for i in range(degree_x.shape[-1])],
index=parent_key.index
)
non_zero_degree_x = pd.concat([parent_key_as_child, non_zero_degree_x], axis=1)
agg_context = agg_index.to_frame().reset_index(drop=True)
agg_context = agg_context.merge(
non_zero_degree_x, how="left", on=agg_index.names
)
agg_context = agg_context.set_index(agg_index.names)
if agg_context.index.nlevels <= 1:
agg_context.index = pd.MultiIndex.from_arrays(
[agg_context.index], names=agg_index.names
)
length = raw_degree.values
agg_context = agg_context.loc[agg_index].values
actual_context = np.concatenate([agg_context, aggregated], axis=1)
actual_context = pd.DataFrame(actual_context, index=agg_index)
transformed_context = np.empty((encoded.shape[0], actual_context.shape[-1]))
for g, idx in groups.items():
transformed_context[idx] = actual_context.loc[g]
actual_context = actual_context.values
fk_info["degree"] = degree_x, degree_y
if table[fk.child_column_names].isna().any().any():
with log_resource_usage(
resource_path, f"get isna {tn}.({'|'.join(fk.child_column_names)})[{fi}]"
):
isna_y = table[fk.child_column_names].isna().any(axis=1)
fk_info["isna"] = np.concatenate([transformed_context, new_encoded], axis=1), isna_y.values
self._nullable[tn].append(True)
else:
self._nullable[tn].append(False)
all_fk_info.append(fk_info)
out = {
"aggregated": (agg_context, aggregated),
"actual": (
actual_context, length, new_encoded,
[groups[tuple(x) if isinstance(x, tuple) else (x,)] for x in agg_index]
),
"foreign_keys": all_fk_info,
}
else:
encoded, _, _, _ = transformer.transform(table)
out = {
"encoded": encoded
}
save_to(out, os.path.join(cache_dir, f"{tn}.pkl"))
def _extend_till(self, table: str, till: str, keys: Sequence[str], cache_dir: str,
fitting: bool = True, queue: List[ForeignKey] = []) -> Tuple[pd.DataFrame, np.ndarray, np.ndarray]:
allowed_tables = self.order[:self.order.index(till)]
raw = pd.read_csv(os.path.join(cache_dir, f"{table}.csv"))
if self.transformers[table].config.foreign_keys:
_, _, encoded, _ = self.actual_generation_for(table, cache_dir)
else:
encoded = self.standalone_encoded_for(table, cache_dir)
core_columns = [f"_dim{i:02d}" for i in range(encoded.shape[-1])]
core = pd.DataFrame(encoded, columns=core_columns, index=raw.index)
core = pd.concat([raw.index.to_frame(False, "_id"), raw, core], axis=1)
for fi, fk in enumerate(self.transformers[table].config.foreign_keys):
if fk in queue:
continue
parent_raw, parent_context, parent_encoded = self._extend_till(
fk.parent_table_name, till, fk.parent_column_names, cache_dir, fitting, queue + [fk]
)
parent_encoded = np.concatenate([parent_context, parent_encoded], axis=1)
if table == till:
parent_encoded = self._reduce_dims(
parent_encoded, fk.parent_table_name, fitting, queue + [fk], cache_dir, allowed_tables
)
parent_encoded = pd.DataFrame(
parent_encoded, columns=[f"_dim{i:02d}_p{fi}" for i in range(parent_encoded.shape[-1])],
index=np.arange(parent_encoded.shape[0])
)
parent_idx_df = parent_raw[fk.parent_column_names].rename(columns={
p: c for p, c in zip(fk.parent_column_names, fk.child_column_names)
})
parent_encoded = pd.concat([parent_idx_df, parent_encoded], axis=1)
core = core.merge(parent_encoded, on=fk.child_column_names, how="left").fillna(-1)
for fi, fk in enumerate(self.children[table]):
if fk.child_table_name not in allowed_tables or fk in queue:
continue
sibling_raw, sibling_context, sibling_encoded = self._extend_till(
fk.child_table_name, till, fk.child_column_names, cache_dir, fitting, queue + [fk]
)
sibling_encoded = np.concatenate([sibling_context, sibling_encoded], axis=1)
sibling_encoded = self._reduce_dims(
sibling_encoded, fk.child_table_name, fitting, queue + [fk], cache_dir, allowed_tables
)
encoded_columns = [f"_dim{i:02d}_c{fi}" for i in range(sibling_encoded.shape[-1])]
sibling_encoded = pd.DataFrame(
sibling_encoded, columns=encoded_columns, index=np.arange(sibling_encoded.shape[0])
)
sibling_idx_df = sibling_raw[fk.child_column_names].rename(columns={
c: p for c, p in zip(fk.child_column_names, fk.parent_column_names)
})
sibling_encoded = pd.concat([sibling_idx_df, sibling_encoded], axis=1)
sibling_encoded_aggregated = sibling_encoded.groupby(fk.parent_column_names).aggregate(["mean", "std"])
sibling_encoded_aggregated = sibling_encoded_aggregated.reset_index()
sibling_encoded_aggregated.columns = pd.Index([
f"{a}${b}" if b else a for a, b in sibling_encoded_aggregated.columns
])
core = core.merge(
sibling_encoded_aggregated, on=fk.parent_column_names, how="left"
).fillna(0)
core = core.set_index("_id").loc[raw.index]
raw_keys = raw[keys]
context_columns = [c for c in core.columns if c.startswith("_dim") and c.endswith("_p0")]
context = core[context_columns]
encoded = core.drop(columns=context_columns + raw.columns.tolist())
if fitting and table == till:
parent_dims = []
name_to_id = {
c: i for i, c in enumerate(encoded.columns)
}
for fi in range(0, len(self.transformers[table].config.foreign_keys)):
parent_dims.append([
name_to_id[n] for n in encoded.columns if n.endswith(f"_p{fi}") and n.startswith("_dim")
])
self._parent_dims[table] = parent_dims
self._core_dims[table] = [name_to_id[n] for n in core_columns]
if raw_keys.shape[0] != encoded.values.shape[0]:
raise RuntimeError(f"Extended table shape changed: {raw_keys.shape, raw.shape, encoded.shape}") # TODO: remove
return raw_keys, context.values, encoded.values
def _reduce_dims(self, parent_encoded: np.ndarray, table: str, fitting: bool, queue: List[ForeignKey],
cache_dir: str, allowed_tables: List[str]) -> np.ndarray:
if parent_encoded.shape[-1] > self.max_ctx_dim:
queue_str = json.dumps([
f"parent={qfk.parent_table_name}, child={qfk.child_table_name}, "
f"columns={qfk.child_column_names}" for qfk in queue
])
pca_name = f"{table}_{len(allowed_tables)}_{hashlib.sha1(queue_str.encode()).hexdigest()}"
os.makedirs(os.path.join(cache_dir, "pca"), exist_ok=True)
pca_path = os.path.join(cache_dir, "pca", f"{pca_name}.pkl")
if fitting:
if os.path.exists(pca_path):
raise FileExistsError(f"File for PCA already exists: {table} {allowed_tables[-1]} {queue}.")
pca = PCA(n_components=self.max_ctx_dim)
parent_encoded = pca.fit_transform(parent_encoded)
save_to(pca, pca_path)
else:
pca = load_from(pca_path)
parent_encoded = pca.transform(parent_encoded)
return parent_encoded
def fitted_size_of(self, table_name: str) -> int:
return self._sizes_of[table_name]
@classmethod
def standalone_encoded_for(cls, table_name: str, cache_dir: str = "./cache") -> np.ndarray:
return load_from(os.path.join(cache_dir, f"{table_name}.pkl"))["encoded"]
@classmethod
def degree_prediction_for(cls, table_name: str, fk_idx: int, cache_dir: str = "./cache") -> Tuple[
np.ndarray, Optional[np.ndarray]
]:
return load_from(os.path.join(cache_dir, f"{table_name}.pkl"))["foreign_keys"][fk_idx]["degree"]
@classmethod
def isna_indicator_prediction_for(cls, table_name: str, fk_idx: int, cache_dir: str = "./cache") -> Optional[Tuple[
np.ndarray, Optional[np.ndarray]
]]:
return load_from(os.path.join(cache_dir, f"{table_name}.pkl"))["foreign_keys"][fk_idx].get("isna")
@classmethod
def aggregated_generation_for(cls, table_name: str, cache_dir: str = "./cache") -> Tuple[
np.ndarray, Optional[np.ndarray]
]:
return load_from(os.path.join(cache_dir, f"{table_name}.pkl"))["aggregated"]
@classmethod
def actual_generation_for(cls, table_name: str, cache_dir: str = "./cache") -> Tuple[
np.ndarray, np.ndarray, Optional[np.ndarray], Optional[List[np.ndarray]]
]:
return load_from(os.path.join(cache_dir, f"{table_name}.pkl"))["actual"]
def fk_matching_for(self, table_name: str, fk_idx: int, sampled_dir: str = "./cache") -> Tuple[
np.ndarray, np.ndarray, np.ndarray, np.ndarray, List[Optional[np.ndarray]], List[np.ndarray]
]:
loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl"))
_, _, values, groups = loaded["actual"]
values = values[:, self._parent_dims[table_name][fk_idx]]
parent, degrees = loaded["foreign_keys"][fk_idx]["degree"]
fk = self.transformers[table_name].config.foreign_keys[fk_idx]
parent = self._reduce_dims(
parent, fk.parent_table_name,
False, [fk], self._fitted_cache_dir, self.order[:self.order.index(table_name)]
)
if values.shape[-1] != parent.shape[-1]:
raise RuntimeError(f"The sizes to be matched are different: {values.shape}, {parent.shape}.")
isnull = loaded["foreign_keys"][fk_idx]["isna"]
if isnull is None:
return_isna = np.zeros(values.shape[0], dtype=np.bool_)
else:
_, return_isna = isnull
# collect prev FK values
key_df = pd.DataFrame(index=pd.RangeIndex(values.shape[0]))
for i, fk in enumerate(self.transformers[table_name].config.foreign_keys[:fk_idx]):
parent_match = loaded["foreign_keys"][i]["match"]
existing_vals = pd.read_csv(
os.path.join(sampled_dir, f"{fk.parent_table_name}.csv")
).rename(
columns={p: c for p, c in zip(fk.parent_column_names, fk.child_column_names)}
)[fk.child_column_names]
isna = np.isnan(parent_match.astype(np.float32))
if isna.any():
dummy_idx = existing_vals.shape[0]
existing_vals.loc[dummy_idx, existing_vals.columns] = np.nan
existing_vals = existing_vals.iloc[
np.where(isna, dummy_idx, parent_match)
].reset_index(drop=True)
else:
existing_vals = existing_vals.iloc[parent_match].reset_index(drop=True)
if set(key_df.columns) & set(existing_vals.columns):
same_cols = [*set(key_df.columns) & set(existing_vals.columns)]
if key_df[same_cols][~return_isna].equals(
existing_vals[same_cols][~return_isna].astype(key_df[same_cols].dtypes)
):
new_cols = [*set(existing_vals.columns) - set(key_df.columns)]
key_df[new_cols] = existing_vals[new_cols]
else:
raise RuntimeError(f"Overlapping FKs in previous FKs invalid ({table_name})[{fk_idx}].")
else:
key_df[fk.child_column_names] = existing_vals
pools = [None] * values.shape[0]
# overlapping FKs result in limited pools
curr_fk = self.transformers[table_name].config.foreign_keys[fk_idx]
prev_fk_cols = set()
this_parent_raw = pd.read_csv(os.path.join(sampled_dir, f"{curr_fk.parent_table_name}.csv")).rename(
columns={p: c for p, c in zip(curr_fk.parent_column_names, curr_fk.child_column_names)}
)
for i, fk in enumerate(self.transformers[table_name].config.foreign_keys[:fk_idx]):
set1_cols = set(curr_fk.child_column_names) & set(fk.child_column_names)
if set1_cols:
set1_cols = [*set1_cols]
existing_vals = key_df[set1_cols]
this_parent_to_overlap_grouped = this_parent_raw[set1_cols].groupby(set1_cols)
for ov, rows in existing_vals.groupby(set1_cols):
try:
this_parent_rows = this_parent_to_overlap_grouped.get_group(ov)
allowed_choices = this_parent_rows.index.values
for r in rows.index:
if pools[r] is None:
pools[r] = allowed_choices
else:
pools[r] = np.intersect1d(pools[r], allowed_choices)
except KeyError:
pass
prev_fk_cols |= set(fk.child_column_names)
curr_fk_cols = set(curr_fk.child_column_names)
all_fk_cols = prev_fk_cols | curr_fk_cols
# inequality results in limited pools
for (a, b) in self.transformers[table_name].config.inequality:
this_ineq_cols = set(a) | set(b)
if this_ineq_cols <= all_fk_cols and not this_ineq_cols <= prev_fk_cols:
for i, fk in enumerate(self.transformers[table_name].config.foreign_keys[:fk_idx]):
set1_cols = set(fk.child_column_names) & this_ineq_cols
set2_cols = this_ineq_cols - prev_fk_cols
if set1_cols:
if set1_cols & set(a):
set1_cols = [x for x in a if x in set1_cols]
set2_cols = [x for x in b if x in set2_cols]
else:
set1_cols = [x for x in b if x in set1_cols]
set2_cols = [x for x in a if x in set2_cols]
existing_vals = key_df[set1_cols]
this_parent_to_overlap_grouped = this_parent_raw[set2_cols].groupby(set2_cols)
for ov, rows in existing_vals.groupby(set1_cols):
try:
this_parent_rows = this_parent_to_overlap_grouped.get_group(ov)
disallowed_choices = this_parent_rows.index.values
for r in rows.index:
if pools[r] is None:
pools[r] = np.setdiff1d(np.arange(this_parent_raw.shape[0]), disallowed_choices)
else:
pools[r] = np.setdiff1d(pools[r], disallowed_choices)
except KeyError:
pass
# uniqueness constraints of uniqueness groups
uniqueness_groups = []
if self.transformers[table_name].config.primary_key:
pk_cols = set(self.transformers[table_name].config.primary_key)
if pk_cols <= (curr_fk_cols | prev_fk_cols) and not pk_cols <= prev_fk_cols:
core_cols = [*pk_cols & prev_fk_cols]
for g, d in key_df.groupby(core_cols):
uniqueness_groups.append(d.index.values)
return values, parent, degrees, return_isna, pools, uniqueness_groups
def prepare_sampled_dir(self, sampled_dir: str):
if os.path.exists(sampled_dir):
shutil.rmtree(sampled_dir)
os.makedirs(sampled_dir, exist_ok=True)
if os.path.exists(os.path.join(self._fitted_cache_dir, "pca")):
shutil.copytree(os.path.join(self._fitted_cache_dir, "pca"), os.path.join(sampled_dir, "pca"))
@classmethod
def save_standalone_encoded_for(cls, table_name: str, encoded: np.ndarray, sampled_dir: str = "./sampled"):
save_to({"encoded": encoded}, os.path.join(sampled_dir, f"{table_name}.pkl"))
@classmethod
def save_degree_for(cls, table_name: str, fk_idx: int, degree: np.ndarray, sampled_dir: str = "./sampled"):
loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl"))
x, _ = loaded["foreign_keys"][fk_idx]["degree"]
loaded["foreign_keys"][fk_idx]["degree"] = x, degree
if fk_idx == 0:
a, b, c, d = loaded.get("actual", (None, None, None, None))
non_zero_deg = degree > 0
loaded["actual"] = a, degree[non_zero_deg], c, d
non_zero_x = x[non_zero_deg]
loaded["aggregated"] = non_zero_x, None
save_to(loaded, os.path.join(sampled_dir, f"{table_name}.pkl"))
def save_isna_indicator_for(self, table_name: str, fk_idx: int, isna: np.ndarray, sampled_dir: str = "./sampled"):
loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl"))
x, _ = loaded["foreign_keys"][fk_idx]["isna"]
loaded["foreign_keys"][fk_idx]["isna"] = x, isna
a, b, encoded, d = loaded["actual"]
encoded[np.ix_(isna, self._parent_dims[table_name][fk_idx])] = 0
loaded["actual"] = a, b, encoded, d
save_to(loaded, os.path.join(sampled_dir, f"{table_name}.pkl"))
@classmethod
def save_aggregated_info_for(cls, table_name: str, aggregated: np.ndarray, sampled_dir: str = "./sampled"):
loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl"))
agg_context, _ = loaded["aggregated"]
loaded["aggregated"] = agg_context, aggregated
actual_context = np.concatenate([agg_context, aggregated], axis=1)
_, length, _, _ = loaded["actual"]
loaded["actual"] = actual_context, length, None, None
save_to(loaded, os.path.join(sampled_dir, f"{table_name}.pkl"))
@classmethod
def save_actual_values_for(
cls, table_name: str, values: np.ndarray, groups: List[np.ndarray], sampled_dir: str = "./sampled"
):
loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl"))
context, length, _, _ = loaded["actual"]
length = np.array([len(x) for x in groups])
loaded["actual"] = context, length, values, groups
for i, fk in enumerate(loaded["foreign_keys"]):
isnull = fk["isna"]
if isnull is not None:
cids = np.repeat(np.arange(context.shape[0]), length.astype(int))
loaded["foreign_keys"][i]["isna"] = np.concatenate([context[cids], values], axis=1), None
break
save_to(loaded, os.path.join(sampled_dir, f"{table_name}.pkl"))
def save_matched_indices_for(self, table_name: str, fk_idx: int,
indices: np.ndarray, sampled_dir: str = "./sampled"):
loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl"))
loaded["foreign_keys"][fk_idx]["match"] = indices
context, length, encoded, d = loaded["actual"]
parent, _ = loaded["foreign_keys"][fk_idx]["degree"]
isna = np.isnan(indices.astype(np.float32))
if self._parent_dims[table_name][fk_idx]:
fk = self.transformers[table_name].config.foreign_keys[fk_idx]
encoded[np.ix_(np.nonzero(~isna)[0], self._parent_dims[table_name][fk_idx])] = self._reduce_dims(
parent[indices[~isna].astype(np.int32)], fk.parent_table_name, False, [fk],
self._fitted_cache_dir, self.order[:self.order.index(table_name)]
)
loaded["actual"] = context, length, encoded, d
for i, fk in enumerate(loaded["foreign_keys"]):
if i <= fk_idx:
continue
isnull = fk["isna"]
if isnull is not None:
cids = np.repeat(np.arange(context.shape[0]), length.astype(int))
loaded["foreign_keys"][i]["isna"] = np.concatenate([context[cids], encoded], axis=1), None
break
save_to(loaded, os.path.join(sampled_dir, f"{table_name}.pkl"))
def copy_fitted_for(self, table_name: str, sampled_dir: str = "./sampled"):
shutil.copyfile(os.path.join(self._fitted_cache_dir, f"{table_name}.pkl"),
os.path.join(sampled_dir, f"{table_name}.pkl"))
shutil.copyfile(os.path.join(self._fitted_cache_dir, f"{table_name}.csv"),
os.path.join(sampled_dir, f"{table_name}.csv"))
def prepare_next_for(self, table_name: str, sampled_dir: str = "./cache"):
if self.transformers[table_name].config.foreign_keys:
_, aggregated = self.aggregated_generation_for(table_name, sampled_dir)
_, _, encoded, indices = self.actual_generation_for(table_name, sampled_dir)
_, deg = self.degree_prediction_for(table_name, 0, sampled_dir)
foreign_keys = self.transformers[table_name].config.foreign_keys
fk = foreign_keys[0]
parent = pd.read_csv(os.path.join(sampled_dir, f"{fk.parent_table_name}.csv"))
parent_idx = pd.MultiIndex.from_frame(parent[fk.parent_column_names].rename({
p: c for p, c in zip(fk.parent_column_names, fk.child_column_names)
}))[deg > 0]
groups = {
pi: idx for pi, idx in zip(parent_idx, indices)
}
recovered = self.transformers[table_name].inverse_transform(
encoded[:, self._core_dims[table_name]], groups, aggregated, parent_idx
)
occurred_cols = set()
if len(foreign_keys) > 1:
loaded = load_from(os.path.join(sampled_dir, f"{table_name}.pkl"))
else:
loaded = None
for i, fk in enumerate(foreign_keys):
if i == 0:
occurred_cols |= set(fk.child_column_names)
continue
new_cols = [c for c in fk.child_column_names if c not in occurred_cols]
match_indices = loaded["foreign_keys"][i]["match"]
parent_table = pd.read_csv(os.path.join(sampled_dir, f"{fk.parent_table_name}.csv"))
dummy_index = parent_table.shape[0]
parent_table.loc[dummy_index, parent_table.columns] = np.nan
recovered.loc[:, new_cols] = parent_table.iloc[np.where(
np.isnan(match_indices.astype(np.float32)), dummy_index, match_indices
)].rename(
columns={p: c for p, c in zip(fk.parent_column_names, fk.child_column_names)}
)[new_cols].set_axis(recovered.index, axis=0)
occurred_cols |= set(fk.child_column_names)
else:
encoded = self.standalone_encoded_for(table_name, sampled_dir)
recovered = self.transformers[table_name].inverse_transform(encoded)
recovered.to_csv(os.path.join(sampled_dir, f"{table_name}.csv"), index=False)
table_idx = self.order.index(table_name)
if table_idx >= len(self.order) - 1:
return
next_table_name = self.order[table_idx + 1]
degrees = []
for i, fk in enumerate(self.transformers[next_table_name].config.foreign_keys):
parent_raw, parent_context, parent_encoded = self._extend_till(
fk.parent_table_name, next_table_name, fk.parent_column_names, sampled_dir, False, [fk]
)
parent_extend_till = np.concatenate([parent_context, parent_encoded], axis=1)
degrees.append(parent_extend_till)
save_to({
"foreign_keys": [{
"degree": (x, None), "isna": (None, None) if y else None
} for x, y in zip(degrees, self._nullable.get(next_table_name, []))]
}, os.path.join(sampled_dir, f"{next_table_name}.pkl"))