|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import math |
|
|
from typing import Dict, List, Tuple |
|
|
|
|
|
import numpy as np |
|
|
from numpy import ndarray |
|
|
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, RobustScaler |
|
|
|
|
|
from nemo.utils import logging |
|
|
|
|
|
__all__ = ["IntCode", "FloatCode", "CategoryCode", "ColumnCodes"] |
|
|
|
|
|
|
|
|
class Code(object): |
|
|
def compute_code(self, data_series: ndarray): |
|
|
""" |
|
|
@params: |
|
|
data_series: an array of input data used to calculate mapping |
|
|
""" |
|
|
raise NotImplementedError() |
|
|
|
|
|
def __init__(self, col_name: str, code_len: int, start_id: int, fillall: bool = True, hasnan: bool = True): |
|
|
""" |
|
|
@params: |
|
|
col_name: name of the column |
|
|
code_len: number of tokens used to code the column. |
|
|
start_id: offset for token_id. |
|
|
fillall: if True, reserve space for digit number even the digit number is |
|
|
not present in the data_series. Otherwise, only reserve space for the numbers |
|
|
in the data_series. |
|
|
hasnan: if True, reserve space for nan |
|
|
""" |
|
|
self.name = col_name |
|
|
self.code_len = code_len |
|
|
self.start_id = start_id |
|
|
self.end_id = start_id |
|
|
self.fillall = fillall |
|
|
self.hasnan = hasnan |
|
|
|
|
|
def encode(self, item: str) -> List[int]: |
|
|
raise NotImplementedError() |
|
|
|
|
|
def decode(self, ids: List[int]) -> str: |
|
|
raise NotImplementedError() |
|
|
|
|
|
@property |
|
|
def code_range(self) -> List[Tuple[int, int]]: |
|
|
""" |
|
|
get the vocab id range for each of the encoded tokens |
|
|
@returns [(min, max), (min, max), ...] |
|
|
""" |
|
|
return [(self.start_id, self.end_id)] |
|
|
|
|
|
|
|
|
class IntCode(Code): |
|
|
def __init__( |
|
|
self, col_name: str, code_len: int, start_id: int, fillall: bool = True, base: int = 100, hasnan: bool = True |
|
|
): |
|
|
super().__init__(col_name, code_len, start_id, fillall, hasnan) |
|
|
self.base = base |
|
|
self.int_min: int = None |
|
|
|
|
|
def compute_code(self, data_series: ndarray): |
|
|
significant_val = self.array_convert_to_int(data_series) |
|
|
|
|
|
digits_id_to_item = [{} for _ in range(self.code_len)] |
|
|
digits_item_to_id = [{} for _ in range(self.code_len)] |
|
|
for i in range(self.code_len): |
|
|
id_to_item = digits_id_to_item[i] |
|
|
item_to_id = digits_item_to_id[i] |
|
|
v = (significant_val // self.base ** i) % self.base |
|
|
if self.fillall: |
|
|
uniq_items = range(0, self.base) |
|
|
else: |
|
|
uniq_items = sorted(np.unique(v).tolist()) |
|
|
for k in range(len(uniq_items)): |
|
|
item = str(uniq_items[k]) |
|
|
item_to_id[item] = self.end_id |
|
|
id_to_item[self.end_id] = item |
|
|
self.end_id += 1 |
|
|
self.digits_id_to_item = digits_id_to_item |
|
|
self.digits_item_to_id = digits_item_to_id |
|
|
self.NA_token = 'nan' |
|
|
if self.hasnan: |
|
|
self.end_id += 1 |
|
|
codes = [] |
|
|
ranges = self.code_range |
|
|
for i in ranges: |
|
|
codes.append(i[1] - 1) |
|
|
self.NA_token_id = codes |
|
|
|
|
|
def array_convert_to_int(self, val: ndarray): |
|
|
val = val.astype(int) |
|
|
self.int_min = val.min() |
|
|
return val - self.int_min |
|
|
|
|
|
def convert_to_int(self, val: float) -> int: |
|
|
return int(val) - self.int_min |
|
|
|
|
|
def reverse_convert_to_int(self, val: int) -> int: |
|
|
return val + self.int_min |
|
|
|
|
|
@property |
|
|
def code_range(self) -> List[Tuple[int, int]]: |
|
|
""" |
|
|
get the vocab id range for each of the encoded tokens |
|
|
@returns [(min, max), (min, max), ...] |
|
|
""" |
|
|
|
|
|
outputs = [] |
|
|
c = 0 |
|
|
for i in reversed(range(self.code_len)): |
|
|
ids = self.digits_id_to_item[i].keys() |
|
|
if c == 0: |
|
|
if self.hasnan: |
|
|
outputs.append((min(ids), max(ids) + 2)) |
|
|
else: |
|
|
outputs.append((min(ids), max(ids) + 1)) |
|
|
else: |
|
|
outputs.append((min(ids), max(ids) + 1)) |
|
|
c += 1 |
|
|
return outputs |
|
|
|
|
|
def encode(self, item: str) -> List[int]: |
|
|
if self.hasnan and item == self.NA_token: |
|
|
return self.NA_token_id |
|
|
elif not self.hasnan and item == self.NA_token: |
|
|
raise ValueError(f"colum {self.name} cannot handle nan, please set hasnan=True") |
|
|
val = float(item) |
|
|
val_int = self.convert_to_int(val) |
|
|
digits = [] |
|
|
for i in range(self.code_len): |
|
|
digit = (val_int // self.base ** i) % self.base |
|
|
digits.append(str(digit)) |
|
|
if (val_int // self.base ** self.code_len) != 0: |
|
|
raise ValueError("not right length") |
|
|
codes = [] |
|
|
for i in reversed(range(self.code_len)): |
|
|
digit_str = digits[i] |
|
|
if digit_str in self.digits_item_to_id[i]: |
|
|
codes.append(self.digits_item_to_id[i][digit_str]) |
|
|
else: |
|
|
|
|
|
allowed_digits = np.array([int(d) for d in self.digits_item_to_id[i].keys()]) |
|
|
near_id = np.argmin(np.abs(allowed_digits - int(digit_str))) |
|
|
digit_str = str(allowed_digits[near_id]) |
|
|
codes.append(self.digits_item_to_id[i][digit_str]) |
|
|
logging.warning('out of domain num is encounterd, use nearest code') |
|
|
return codes |
|
|
|
|
|
def decode(self, ids: List[int]) -> str: |
|
|
if self.hasnan and ids[0] == self.NA_token_id[0]: |
|
|
return self.NA_token |
|
|
v = 0 |
|
|
for i in reversed(range(self.code_len)): |
|
|
digit = int(self.digits_id_to_item[i][ids[self.code_len - i - 1]]) |
|
|
v += digit * self.base ** i |
|
|
v = self.reverse_convert_to_int(v) |
|
|
return str(v) |
|
|
|
|
|
|
|
|
class FloatCode(IntCode): |
|
|
def __init__( |
|
|
self, |
|
|
col_name: str, |
|
|
code_len: int, |
|
|
start_id: int, |
|
|
fillall: bool = True, |
|
|
base: int = 100, |
|
|
hasnan: bool = True, |
|
|
transform: str = 'quantile', |
|
|
): |
|
|
super().__init__(col_name, code_len, start_id, fillall, base, hasnan) |
|
|
if transform == 'yeo-johnson': |
|
|
self.scaler = PowerTransformer(standardize=True) |
|
|
elif transform == 'quantile': |
|
|
self.scaler = QuantileTransformer(output_distribution='uniform', n_quantiles=100) |
|
|
elif transform == 'robust': |
|
|
self.scaler = RobustScaler() |
|
|
else: |
|
|
raise ValueError('Supported data transformations are "yeo-johnson", "quantile", and "robust"') |
|
|
|
|
|
def convert_to_int(self, val: float) -> int: |
|
|
val = np.expand_dims(np.array(val), axis=0) |
|
|
values = self.scaler.transform(val[:, None])[:, 0] - self.mval |
|
|
values = (values * self.base ** self.extra_digits).astype(int) |
|
|
output = values[0] |
|
|
return output |
|
|
|
|
|
def array_convert_to_int(self, val: ndarray): |
|
|
values = self.scaler.fit_transform(val[:, None])[:, 0] |
|
|
self.mval = values.min() |
|
|
values = values - self.mval |
|
|
digits = int(math.log(values.max(), self.base)) + 1 |
|
|
|
|
|
extra_digits = self.code_len - digits |
|
|
if extra_digits < 0: |
|
|
raise ValueError("need large length to code the nummber") |
|
|
self.extra_digits = extra_digits |
|
|
values = (values * self.base ** self.extra_digits).astype(int) |
|
|
return values |
|
|
|
|
|
def reverse_convert_to_int(self, val: int) -> float: |
|
|
val = val / self.base ** self.extra_digits |
|
|
val = np.expand_dims(np.array(val), axis=0) |
|
|
v = self.scaler.inverse_transform(val[:, None] + self.mval)[0, 0] |
|
|
return v |
|
|
|
|
|
def decode(self, ids: List[int]) -> str: |
|
|
if self.hasnan and ids[0] == self.NA_token_id[0]: |
|
|
return self.NA_token |
|
|
v = 0 |
|
|
for i in reversed(range(self.code_len)): |
|
|
digit = int(self.digits_id_to_item[i][ids[self.code_len - i - 1]]) |
|
|
v += digit * self.base ** i |
|
|
v = self.reverse_convert_to_int(v) |
|
|
accuracy = max(int(abs(np.log10(0.1 / self.base ** self.extra_digits))), 1) |
|
|
return f"{v:.{accuracy}f}" |
|
|
|
|
|
|
|
|
class CategoryCode(Code): |
|
|
def __init__(self, col_name: str, start_id: int): |
|
|
super().__init__(col_name, 1, start_id, True, False) |
|
|
|
|
|
def compute_code(self, data_series: ndarray): |
|
|
uniq_items = np.unique(data_series).tolist() |
|
|
id_to_item = {} |
|
|
item_to_id = {} |
|
|
for i in range(len(uniq_items)): |
|
|
item = str(uniq_items[i]) |
|
|
item_to_id[item] = self.end_id |
|
|
id_to_item[self.end_id] = item |
|
|
self.end_id += 1 |
|
|
self.id_to_item = id_to_item |
|
|
self.item_to_id = item_to_id |
|
|
|
|
|
def encode(self, item) -> List[int]: |
|
|
return [self.item_to_id[item]] |
|
|
|
|
|
def decode(self, ids: List[int]) -> str: |
|
|
return self.id_to_item[ids[0]] |
|
|
|
|
|
|
|
|
column_map = {"int": IntCode, "float": FloatCode, "category": CategoryCode} |
|
|
|
|
|
|
|
|
class ColumnCodes(object): |
|
|
def __init__(self): |
|
|
self.column_codes: Dict[str, Code] = {} |
|
|
self.columns = [] |
|
|
self.sizes = [] |
|
|
|
|
|
@property |
|
|
def vocab_size(self): |
|
|
return self.column_codes[self.columns[-1]].end_id |
|
|
|
|
|
def register(self, name: str, ccode: Code): |
|
|
self.columns.append(name) |
|
|
self.column_codes[name] = ccode |
|
|
self.sizes.append(ccode.code_len) |
|
|
|
|
|
def encode(self, col: str, item: str) -> List[int]: |
|
|
if col in self.column_codes: |
|
|
return self.column_codes[col].encode(item) |
|
|
else: |
|
|
raise ValueError(f"cannot encode {col} {item}") |
|
|
|
|
|
def decode(self, col: str, ids: List[int]) -> str: |
|
|
if col in self.column_codes: |
|
|
return self.column_codes[col].decode(ids) |
|
|
else: |
|
|
raise ValueError("cannot decode") |
|
|
|
|
|
def get_range(self, column_id: int) -> List[Tuple[int, int]]: |
|
|
return self.column_codes[self.columns[column_id]].code_range |
|
|
|
|
|
@classmethod |
|
|
def get_column_codes(cls, column_configs, example_arrays): |
|
|
column_codes = cls() |
|
|
beg = 0 |
|
|
cc = None |
|
|
for config in column_configs: |
|
|
col_name = config['name'] |
|
|
coder = column_map[config['code_type']] |
|
|
args = config.get('args', {}) |
|
|
start_id = beg if cc is None else cc.end_id |
|
|
args['start_id'] = start_id |
|
|
args['col_name'] = col_name |
|
|
cc = coder(**args) |
|
|
cc.compute_code(example_arrays[col_name]) |
|
|
column_codes.register(col_name, cc) |
|
|
return column_codes |
|
|
|