import re from collections import Counter from .linalg import Matrix, np, zeros TOKEN_PATTERN = re.compile(r"[A-Za-z0-9']+") FRAMETOKEN_WORD_PREFIX = "▁" def tokenize(text: str) -> list[str]: return TOKEN_PATTERN.findall(text.lower()) def build_vocabulary( tokens: list[str], min_frequency: int = 1, max_vocab: int | None = None, ) -> tuple[dict[str, int], list[str]]: counts = Counter(tokens) return build_vocabulary_from_counts( counts, min_frequency=min_frequency, max_vocab=max_vocab, ) def build_vocabulary_from_counts( counts: dict[str, float], min_frequency: int = 1, max_vocab: int | None = None, ) -> tuple[dict[str, int], list[str]]: items = [ (token, count) for token, count in sorted(counts.items(), key=lambda pair: (-pair[1], pair[0])) if count >= min_frequency ] if max_vocab is not None: if any(_looks_like_frametoken(token) for token, _ in items): items = _prioritize_frametoken_output_items(items)[:max_vocab] else: items = items[:max_vocab] id_to_token = [token for token, _ in items] token_to_id = {token: index for index, token in enumerate(id_to_token)} return token_to_id, id_to_token def _looks_like_frametoken(token: str) -> bool: return token.startswith(FRAMETOKEN_WORD_PREFIX) or ( token.startswith("<") and token.endswith(">") ) def _is_special_token(token: str) -> bool: return token.startswith("<") and token.endswith(">") def _is_word_start_token(token: str) -> bool: return token.startswith(FRAMETOKEN_WORD_PREFIX) def _is_single_letter_word_start(token: str) -> bool: if not token.startswith(FRAMETOKEN_WORD_PREFIX): return False rendered = token[len(FRAMETOKEN_WORD_PREFIX) :] return len(rendered) == 1 and rendered.isalpha() and rendered not in {"A", "I"} def _is_bare_fallback_token(token: str) -> bool: return len(token) == 1 and not token.startswith(FRAMETOKEN_WORD_PREFIX) def _prioritize_frametoken_output_items(items: list[tuple[str, float]]) -> list[tuple[str, float]]: # FrameToken keeps fallback characters for encoding coverage, but the model's # output/readout vocabulary should spend its capped slots on answerable tokens. def priority(item: tuple[str, float]) -> tuple[int, float, str]: token, count = item if _is_special_token(token): group = 0 elif _is_single_letter_word_start(token): group = 3 elif _is_word_start_token(token): group = 1 elif _is_bare_fallback_token(token): group = 4 else: group = 2 return (group, -count, token) return sorted(items, key=priority) def build_cooccurrence_matrix( tokens: list[str], token_to_id: dict[str, int], window_size: int, ) -> Matrix: size = len(token_to_id) token_ids = [token_to_id[token] for token in tokens if token in token_to_id] if np is not None and size > 0 and token_ids: matrix = np.zeros((size, size), dtype=np.float64) token_array = np.asarray(token_ids, dtype=np.int64) for offset in range(1, window_size + 1): if len(token_array) <= offset: break left = token_array[:-offset] right = token_array[offset:] weight = 1.0 / offset np.add.at(matrix, (left, right), weight) np.add.at(matrix, (right, left), weight) return matrix.tolist() matrix = zeros(size, size) for index, token_id in enumerate(token_ids): for offset in range(1, window_size + 1): other_index = index + offset if other_index >= len(token_ids): break other_id = token_ids[other_index] weight = 1.0 / offset matrix[token_id][other_id] += weight matrix[other_id][token_id] += weight return matrix