| """Deterministic value-to-token mapping for structured transaction features. |
| |
| Each feature has its own tokenizer that maps raw values to token IDs using |
| the reserved token convention (D6): 0=MASK, 1=OOV, 2=NULL, 3+=real values. |
| Categorical features map directly; bucketed features use quantile or uniform |
| boundaries computed during fit(). |
| |
| OOV handling (D3): values outside the known range map to OOV token (1) and |
| increment a per-feature counter logged during eval. |
| """ |
|
|
| import hashlib |
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
|
|
| from src.data.schema import ( |
| FeatureSchema, |
| SchemaConfig, |
| MASK_TOKEN, |
| NULL_TOKEN, |
| OOV_TOKEN, |
| VALUES_START, |
| load_schema, |
| ) |
|
|
|
|
| class FeatureTokenizer: |
| """Tokenizer for a single feature. Handles encode, decode, and OOV tracking.""" |
|
|
| def __init__(self, schema: FeatureSchema) -> None: |
| self._schema = schema |
| self._boundaries: np.ndarray | None = None |
| self._oov_count: int = 0 |
| self._fitted: bool = schema.type != "bucketed" |
|
|
| @property |
| def name(self) -> str: |
| return self._schema.name |
|
|
| @property |
| def vocab_size(self) -> int: |
| return self._schema.vocab_size |
|
|
| @property |
| def is_fitted(self) -> bool: |
| return self._fitted |
|
|
| @property |
| def oov_count(self) -> int: |
| return self._oov_count |
|
|
| def reset_oov_count(self) -> None: |
| self._oov_count = 0 |
|
|
| def fit(self, values: np.ndarray) -> None: |
| """Compute bucket boundaries from data. Only valid for bucketed features. |
| |
| Args: |
| values: 1-D array of raw continuous values to compute boundaries from. |
| """ |
| assert self._schema.type == "bucketed", ( |
| f"fit() only applies to bucketed features, not {self._schema.type}" |
| ) |
| assert self._schema.bucket_range is not None |
| assert self._schema.bucket_method is not None |
|
|
| num_buckets = self._schema.num_values |
|
|
| if self._schema.bucket_method == "quantile": |
| quantiles = np.linspace(0.0, 1.0, num_buckets + 1) |
| self._boundaries = np.quantile(values, quantiles).astype(np.float64) |
| elif self._schema.bucket_method == "uniform": |
| lo, hi = self._schema.bucket_range |
| self._boundaries = np.linspace(lo, hi, num_buckets + 1, dtype=np.float64) |
| else: |
| raise ValueError(f"Unknown bucket_method: {self._schema.bucket_method}") |
|
|
| self._fitted = True |
|
|
| def fit_uniform_from_range(self) -> None: |
| """Compute uniform boundaries directly from the schema's bucket_range.""" |
| assert self._schema.type == "bucketed" |
| assert self._schema.bucket_range is not None |
| lo, hi = self._schema.bucket_range |
| self._boundaries = np.linspace( |
| lo, hi, self._schema.num_values + 1, dtype=np.float64 |
| ) |
| self._fitted = True |
|
|
| def encode(self, values: np.ndarray) -> np.ndarray: |
| """Map raw values to token IDs. |
| |
| Args: |
| values: array of raw feature values (any shape). |
| |
| Returns: |
| int16 array of token IDs, same shape as input. |
| """ |
| assert self._fitted, f"Feature '{self.name}' not fitted. Call fit() first." |
| original_shape = values.shape |
| flat = values.ravel() |
|
|
| if self._schema.type == "bucketed": |
| token_ids, oov_count = self._encode_bucketed(flat) |
| else: |
| token_ids, oov_count = self._encode_categorical(flat) |
|
|
| self._oov_count += oov_count |
| return token_ids.reshape(original_shape) |
|
|
| def decode(self, token_ids: np.ndarray) -> np.ndarray: |
| """Map token IDs back to values. Bucketed features return bucket centers. |
| |
| Special tokens (MASK, OOV, NULL) decode to NaN. |
| |
| Args: |
| token_ids: array of token IDs (any shape). |
| |
| Returns: |
| float64 array of decoded values, same shape as input. |
| """ |
| original_shape = token_ids.shape |
| flat = token_ids.ravel().astype(np.int64) |
| result = np.full(len(flat), np.nan, dtype=np.float64) |
|
|
| value_mask = flat >= VALUES_START |
| value_indices = flat[value_mask] - VALUES_START |
|
|
| if self._schema.type == "bucketed" and self._boundaries is not None: |
| centers = (self._boundaries[:-1] + self._boundaries[1:]) / 2.0 |
| valid = value_indices < len(centers) |
| result_positions = np.where(value_mask)[0] |
| result[result_positions[valid]] = centers[value_indices[valid]] |
| else: |
| result[value_mask] = value_indices.astype(np.float64) |
|
|
| return result.reshape(original_shape) |
|
|
| def _encode_categorical(self, values: np.ndarray) -> tuple[np.ndarray, int]: |
| """Encode categorical/binary values. Returns (token_ids, oov_count).""" |
| int_values = values.astype(np.int64) |
| oov_mask = (int_values < 0) | (int_values >= self._schema.num_values) |
| token_ids = (int_values + VALUES_START).astype(np.int16) |
| token_ids[oov_mask] = OOV_TOKEN |
| return token_ids, int(oov_mask.sum()) |
|
|
| def _encode_bucketed(self, values: np.ndarray) -> tuple[np.ndarray, int]: |
| """Encode bucketed values using pre-computed boundaries. |
| |
| Returns (token_ids, oov_count). Uses np.digitize on internal boundaries |
| so bucket index i covers [boundaries[i], boundaries[i+1]). |
| The last bucket includes its upper bound: [boundaries[-2], boundaries[-1]]. |
| """ |
| assert self._boundaries is not None |
| internal_bins = self._boundaries[1:-1] |
| bucket_idx = np.digitize(values, internal_bins) |
| |
| |
| |
| |
| bucket_idx = np.clip(bucket_idx, 0, self._schema.num_values - 1) |
|
|
| oov_mask = (values < self._boundaries[0]) | (values > self._boundaries[-1]) |
| token_ids = (bucket_idx + VALUES_START).astype(np.int16) |
| token_ids[oov_mask] = OOV_TOKEN |
| return token_ids, int(oov_mask.sum()) |
|
|
| def get_state(self) -> dict[str, Any]: |
| """Serializable state for fingerprinting and persistence.""" |
| state: dict[str, Any] = { |
| "name": self._schema.name, |
| "type": self._schema.type, |
| "num_values": self._schema.num_values, |
| "vocab_size": self._schema.vocab_size, |
| } |
| if self._boundaries is not None: |
| state["boundaries"] = self._boundaries.tolist() |
| return state |
|
|
|
|
| class TransactionTokenizer: |
| """Orchestrates tokenization across all features in the schema. |
| |
| Usage: |
| schema = load_schema("data/schema.yaml") |
| tokenizer = TransactionTokenizer(schema) |
| |
| # Fit bucketed features from raw data |
| tokenizer.fit_feature("amount", raw_amounts) |
| tokenizer.fit_feature("days_since_last", raw_days) |
| |
| # Encode |
| token_ids = tokenizer.encode_feature("amount", raw_values) |
| |
| # Save state and compute fingerprint |
| tokenizer.save_state("data/synthetic/tokenizer_state.json") |
| fp = tokenizer.compute_fingerprint("data/schema.yaml") |
| """ |
|
|
| def __init__(self, schema: SchemaConfig) -> None: |
| self._schema = schema |
| self._tokenizers: dict[str, FeatureTokenizer] = {} |
| for feature in schema.features: |
| self._tokenizers[feature.name] = FeatureTokenizer(feature) |
|
|
| @property |
| def feature_names(self) -> list[str]: |
| return self._schema.feature_names() |
|
|
| @property |
| def num_features(self) -> int: |
| return self._schema.num_features |
|
|
| def get_feature_tokenizer(self, name: str) -> FeatureTokenizer: |
| return self._tokenizers[name] |
|
|
| def fit_feature(self, name: str, values: np.ndarray) -> None: |
| """Fit bucket boundaries for a single bucketed feature.""" |
| self._tokenizers[name].fit(values) |
|
|
| def is_all_fitted(self) -> bool: |
| return all(t.is_fitted for t in self._tokenizers.values()) |
|
|
| def encode_feature(self, name: str, values: np.ndarray) -> np.ndarray: |
| """Encode raw values for one feature. Returns int16 token IDs.""" |
| return self._tokenizers[name].encode(values) |
|
|
| def decode_feature(self, name: str, token_ids: np.ndarray) -> np.ndarray: |
| """Decode token IDs for one feature. Returns float64 values.""" |
| return self._tokenizers[name].decode(token_ids) |
|
|
| def inject_nulls( |
| self, token_ids: np.ndarray, null_mask: np.ndarray |
| ) -> np.ndarray: |
| """Replace positions where null_mask is True with NULL token. |
| |
| Args: |
| token_ids: int16 array of token IDs. |
| null_mask: boolean array, same shape as token_ids. |
| |
| Returns: |
| Copy of token_ids with NULLs injected. |
| """ |
| result = token_ids.copy() |
| result[null_mask] = NULL_TOKEN |
| return result |
|
|
| @property |
| def oov_counts(self) -> dict[str, int]: |
| return {name: t.oov_count for name, t in self._tokenizers.items()} |
|
|
| def reset_oov_counts(self) -> None: |
| for t in self._tokenizers.values(): |
| t.reset_oov_count() |
|
|
| def get_state(self) -> dict[str, Any]: |
| """Full tokenizer state for persistence and fingerprinting.""" |
| return { |
| "num_features": self._schema.num_features, |
| "num_transactions": self._schema.num_transactions, |
| "features": [ |
| self._tokenizers[name].get_state() |
| for name in self._schema.feature_names() |
| ], |
| } |
|
|
| def save_state(self, path: str | Path) -> None: |
| """Save tokenizer state to JSON for fingerprint computation (D4).""" |
| path = Path(path) |
| path.parent.mkdir(parents=True, exist_ok=True) |
| state = self.get_state() |
| with open(path, "w") as fh: |
| json.dump(state, fh, indent=2, sort_keys=True) |
|
|
| @classmethod |
| def from_state(cls, state_path: str | Path, schema: SchemaConfig) -> "TransactionTokenizer": |
| """Reconstruct tokenizer from saved state.""" |
| with open(state_path) as fh: |
| state = json.load(fh) |
|
|
| tokenizer = cls(schema) |
| for feat_state in state["features"]: |
| name = feat_state["name"] |
| ft = tokenizer._tokenizers[name] |
| if "boundaries" in feat_state: |
| ft._boundaries = np.array(feat_state["boundaries"], dtype=np.float64) |
| ft._fitted = True |
| return tokenizer |
|
|
| def compute_fingerprint(self, *config_paths: str | Path) -> str: |
| """SHA256 fingerprint of tokenizer state plus config files (D4). |
| |
| The fingerprint includes the tokenizer's bucket boundaries and vocab |
| sizes, plus the raw bytes of any additional config files (schema, |
| generator config, split indices). Eval refuses to run if fingerprints |
| don't match. |
| |
| Args: |
| config_paths: paths to config files to include in the hash. |
| |
| Returns: |
| Hex-encoded SHA256 digest. |
| """ |
| hasher = hashlib.sha256() |
|
|
| state_bytes = json.dumps(self.get_state(), sort_keys=True).encode("utf-8") |
| hasher.update(state_bytes) |
|
|
| for path in sorted(str(p) for p in config_paths): |
| with open(path, "rb") as fh: |
| hasher.update(fh.read()) |
|
|
| return hasher.hexdigest() |
|
|