| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import math |
| | from typing import Dict, List, Tuple |
| |
|
| | import numpy as np |
| | from numpy import ndarray |
| | from sklearn.preprocessing import PowerTransformer, QuantileTransformer, RobustScaler |
| |
|
| | from nemo.utils import logging |
| |
|
| | __all__ = ["IntCode", "FloatCode", "CategoryCode", "ColumnCodes"] |
| |
|
| |
|
| | class Code(object): |
| | def compute_code(self, data_series: ndarray): |
| | """ |
| | @params: |
| | data_series: an array of input data used to calculate mapping |
| | """ |
| | raise NotImplementedError() |
| |
|
| | def __init__(self, col_name: str, code_len: int, start_id: int, fillall: bool = True, hasnan: bool = True): |
| | """ |
| | @params: |
| | col_name: name of the column |
| | code_len: number of tokens used to code the column. |
| | start_id: offset for token_id. |
| | fillall: if True, reserve space for digit number even the digit number is |
| | not present in the data_series. Otherwise, only reserve space for the numbers |
| | in the data_series. |
| | hasnan: if True, reserve space for nan |
| | """ |
| | self.name = col_name |
| | self.code_len = code_len |
| | self.start_id = start_id |
| | self.end_id = start_id |
| | self.fillall = fillall |
| | self.hasnan = hasnan |
| |
|
| | def encode(self, item: str) -> List[int]: |
| | raise NotImplementedError() |
| |
|
| | def decode(self, ids: List[int]) -> str: |
| | raise NotImplementedError() |
| |
|
| | @property |
| | def code_range(self) -> List[Tuple[int, int]]: |
| | """ |
| | get the vocab id range for each of the encoded tokens |
| | @returns [(min, max), (min, max), ...] |
| | """ |
| | return [(self.start_id, self.end_id)] |
| |
|
| |
|
| | class IntCode(Code): |
| | def __init__( |
| | self, col_name: str, code_len: int, start_id: int, fillall: bool = True, base: int = 100, hasnan: bool = True |
| | ): |
| | super().__init__(col_name, code_len, start_id, fillall, hasnan) |
| | self.base = base |
| | self.int_min: int = None |
| |
|
| | def compute_code(self, data_series: ndarray): |
| | significant_val = self.array_convert_to_int(data_series) |
| |
|
| | digits_id_to_item = [{} for _ in range(self.code_len)] |
| | digits_item_to_id = [{} for _ in range(self.code_len)] |
| | for i in range(self.code_len): |
| | id_to_item = digits_id_to_item[i] |
| | item_to_id = digits_item_to_id[i] |
| | v = (significant_val // self.base ** i) % self.base |
| | if self.fillall: |
| | uniq_items = range(0, self.base) |
| | else: |
| | uniq_items = sorted(np.unique(v).tolist()) |
| | for k in range(len(uniq_items)): |
| | item = str(uniq_items[k]) |
| | item_to_id[item] = self.end_id |
| | id_to_item[self.end_id] = item |
| | self.end_id += 1 |
| | self.digits_id_to_item = digits_id_to_item |
| | self.digits_item_to_id = digits_item_to_id |
| | self.NA_token = 'nan' |
| | if self.hasnan: |
| | self.end_id += 1 |
| | codes = [] |
| | ranges = self.code_range |
| | for i in ranges: |
| | codes.append(i[1] - 1) |
| | self.NA_token_id = codes |
| |
|
| | def array_convert_to_int(self, val: ndarray): |
| | val = val.astype(int) |
| | self.int_min = val.min() |
| | return val - self.int_min |
| |
|
| | def convert_to_int(self, val: float) -> int: |
| | return int(val) - self.int_min |
| |
|
| | def reverse_convert_to_int(self, val: int) -> int: |
| | return val + self.int_min |
| |
|
| | @property |
| | def code_range(self) -> List[Tuple[int, int]]: |
| | """ |
| | get the vocab id range for each of the encoded tokens |
| | @returns [(min, max), (min, max), ...] |
| | """ |
| | |
| | outputs = [] |
| | c = 0 |
| | for i in reversed(range(self.code_len)): |
| | ids = self.digits_id_to_item[i].keys() |
| | if c == 0: |
| | if self.hasnan: |
| | outputs.append((min(ids), max(ids) + 2)) |
| | else: |
| | outputs.append((min(ids), max(ids) + 1)) |
| | else: |
| | outputs.append((min(ids), max(ids) + 1)) |
| | c += 1 |
| | return outputs |
| |
|
| | def encode(self, item: str) -> List[int]: |
| | if self.hasnan and item == self.NA_token: |
| | return self.NA_token_id |
| | elif not self.hasnan and item == self.NA_token: |
| | raise ValueError(f"colum {self.name} cannot handle nan, please set hasnan=True") |
| | val = float(item) |
| | val_int = self.convert_to_int(val) |
| | digits = [] |
| | for i in range(self.code_len): |
| | digit = (val_int // self.base ** i) % self.base |
| | digits.append(str(digit)) |
| | if (val_int // self.base ** self.code_len) != 0: |
| | raise ValueError("not right length") |
| | codes = [] |
| | for i in reversed(range(self.code_len)): |
| | digit_str = digits[i] |
| | if digit_str in self.digits_item_to_id[i]: |
| | codes.append(self.digits_item_to_id[i][digit_str]) |
| | else: |
| | |
| | allowed_digits = np.array([int(d) for d in self.digits_item_to_id[i].keys()]) |
| | near_id = np.argmin(np.abs(allowed_digits - int(digit_str))) |
| | digit_str = str(allowed_digits[near_id]) |
| | codes.append(self.digits_item_to_id[i][digit_str]) |
| | logging.warning('out of domain num is encounterd, use nearest code') |
| | return codes |
| |
|
| | def decode(self, ids: List[int]) -> str: |
| | if self.hasnan and ids[0] == self.NA_token_id[0]: |
| | return self.NA_token |
| | v = 0 |
| | for i in reversed(range(self.code_len)): |
| | digit = int(self.digits_id_to_item[i][ids[self.code_len - i - 1]]) |
| | v += digit * self.base ** i |
| | v = self.reverse_convert_to_int(v) |
| | return str(v) |
| |
|
| |
|
| | class FloatCode(IntCode): |
| | def __init__( |
| | self, |
| | col_name: str, |
| | code_len: int, |
| | start_id: int, |
| | fillall: bool = True, |
| | base: int = 100, |
| | hasnan: bool = True, |
| | transform: str = 'quantile', |
| | ): |
| | super().__init__(col_name, code_len, start_id, fillall, base, hasnan) |
| | if transform == 'yeo-johnson': |
| | self.scaler = PowerTransformer(standardize=True) |
| | elif transform == 'quantile': |
| | self.scaler = QuantileTransformer(output_distribution='uniform', n_quantiles=100) |
| | elif transform == 'robust': |
| | self.scaler = RobustScaler() |
| | else: |
| | raise ValueError('Supported data transformations are "yeo-johnson", "quantile", and "robust"') |
| |
|
| | def convert_to_int(self, val: float) -> int: |
| | val = np.expand_dims(np.array(val), axis=0) |
| | values = self.scaler.transform(val[:, None])[:, 0] - self.mval |
| | values = (values * self.base ** self.extra_digits).astype(int) |
| | output = values[0] |
| | return output |
| |
|
| | def array_convert_to_int(self, val: ndarray): |
| | values = self.scaler.fit_transform(val[:, None])[:, 0] |
| | self.mval = values.min() |
| | values = values - self.mval |
| | digits = int(math.log(values.max(), self.base)) + 1 |
| | |
| | extra_digits = self.code_len - digits |
| | if extra_digits < 0: |
| | raise ValueError("need large length to code the nummber") |
| | self.extra_digits = extra_digits |
| | values = (values * self.base ** self.extra_digits).astype(int) |
| | return values |
| |
|
| | def reverse_convert_to_int(self, val: int) -> float: |
| | val = val / self.base ** self.extra_digits |
| | val = np.expand_dims(np.array(val), axis=0) |
| | v = self.scaler.inverse_transform(val[:, None] + self.mval)[0, 0] |
| | return v |
| |
|
| | def decode(self, ids: List[int]) -> str: |
| | if self.hasnan and ids[0] == self.NA_token_id[0]: |
| | return self.NA_token |
| | v = 0 |
| | for i in reversed(range(self.code_len)): |
| | digit = int(self.digits_id_to_item[i][ids[self.code_len - i - 1]]) |
| | v += digit * self.base ** i |
| | v = self.reverse_convert_to_int(v) |
| | accuracy = max(int(abs(np.log10(0.1 / self.base ** self.extra_digits))), 1) |
| | return f"{v:.{accuracy}f}" |
| |
|
| |
|
| | class CategoryCode(Code): |
| | def __init__(self, col_name: str, start_id: int): |
| | super().__init__(col_name, 1, start_id, True, False) |
| |
|
| | def compute_code(self, data_series: ndarray): |
| | uniq_items = np.unique(data_series).tolist() |
| | id_to_item = {} |
| | item_to_id = {} |
| | for i in range(len(uniq_items)): |
| | item = str(uniq_items[i]) |
| | item_to_id[item] = self.end_id |
| | id_to_item[self.end_id] = item |
| | self.end_id += 1 |
| | self.id_to_item = id_to_item |
| | self.item_to_id = item_to_id |
| |
|
| | def encode(self, item) -> List[int]: |
| | return [self.item_to_id[item]] |
| |
|
| | def decode(self, ids: List[int]) -> str: |
| | return self.id_to_item[ids[0]] |
| |
|
| |
|
| | column_map = {"int": IntCode, "float": FloatCode, "category": CategoryCode} |
| |
|
| |
|
| | class ColumnCodes(object): |
| | def __init__(self): |
| | self.column_codes: Dict[str, Code] = {} |
| | self.columns = [] |
| | self.sizes = [] |
| |
|
| | @property |
| | def vocab_size(self): |
| | return self.column_codes[self.columns[-1]].end_id |
| |
|
| | def register(self, name: str, ccode: Code): |
| | self.columns.append(name) |
| | self.column_codes[name] = ccode |
| | self.sizes.append(ccode.code_len) |
| |
|
| | def encode(self, col: str, item: str) -> List[int]: |
| | if col in self.column_codes: |
| | return self.column_codes[col].encode(item) |
| | else: |
| | raise ValueError(f"cannot encode {col} {item}") |
| |
|
| | def decode(self, col: str, ids: List[int]) -> str: |
| | if col in self.column_codes: |
| | return self.column_codes[col].decode(ids) |
| | else: |
| | raise ValueError("cannot decode") |
| |
|
| | def get_range(self, column_id: int) -> List[Tuple[int, int]]: |
| | return self.column_codes[self.columns[column_id]].code_range |
| |
|
| | @classmethod |
| | def get_column_codes(cls, column_configs, example_arrays): |
| | column_codes = cls() |
| | beg = 0 |
| | cc = None |
| | for config in column_configs: |
| | col_name = config['name'] |
| | coder = column_map[config['code_type']] |
| | args = config.get('args', {}) |
| | start_id = beg if cc is None else cc.end_id |
| | args['start_id'] = start_id |
| | args['col_name'] = col_name |
| | cc = coder(**args) |
| | cc.compute_code(example_arrays[col_name]) |
| | column_codes.register(col_name, cc) |
| | return column_codes |
| |
|