| |
|
|
| r'''############################################################################ |
| ################################################################################ |
| # |
| # |
| # Tegridy Cupy Python Module (TCUPY) |
| # Version 1.0 |
| # |
| # Project Los Angeles |
| # |
| # Tegridy Code 2025 |
| # |
| # https://github.com/asigalov61/tegridy-tools |
| # |
| # |
| ################################################################################ |
| # |
| # Copyright 2024 Project Los Angeles / Tegridy Code |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| ################################################################################ |
| ################################################################################ |
| # |
| # Critical dependencies |
| # |
| # !pip install cupy-cuda12x |
| # !pip install numpy==1.24.4 |
| # |
| ################################################################################ |
| ''' |
|
|
| |
|
|
| print('=' * 70) |
| print('Loading module...') |
| print('Please wait...') |
| print('=' * 70) |
|
|
| |
|
|
| import sys |
| import os |
|
|
| |
|
|
| try: |
| import cupy as cp |
| import cupy as np |
| print('=' * 70) |
| print('CuPy is found!') |
| print('Will use CuPy and GPU for processing!') |
| print('=' * 70) |
|
|
| except ImportError as e: |
| print(f"Error: Could not import CuPy. Details: {e}") |
| |
| |
| print("Please make sure CuPy is installed.") |
| print('=' * 70) |
| |
| raise RuntimeError("CuPy could not be loaded!") from e |
|
|
| |
|
|
| from collections import defaultdict, deque |
| from typing import Optional, Tuple, Dict, Any, List |
|
|
| |
|
|
| |
| MEMORY_LEN = 12 |
| SEQUENCE_LENGTH = 32 |
|
|
| |
| REPETITION_PENALTY = (1.0, 1.0, 1.0) |
| SPIKE_PENALTY_STRENGTH = (1.0, 1.0, 1.0) |
| SPIKE_SIGMA = (1.0, 1.0, 1.0) |
|
|
| |
|
|
| def find_numpy_array(src_array, trg_array): |
|
|
| """ |
| Finds 1D numpy array in 2D numpy array |
| """ |
|
|
| match_mask = np.all(src_array == trg_array, axis=1) |
| |
| return np.where(match_mask)[0] |
|
|
| |
|
|
| def vertical_list_search(src_list, trg_list): |
| |
| """ |
| For each vertical window of consecutive rows of height len(trg_list) in src_list, |
| this function checks whether for every offset j (0 <= j < len(trg_list)) the row |
| at index (window_start + j) contains trg_list[j]. |
| |
| It returns a list of windows (each a list of consecutive row indices) that meet this condition. |
| """ |
| |
| if not src_list or not trg_list: |
| return [] |
| |
| n = len(src_list) |
| k = len(trg_list) |
| |
| num_windows = n - k + 1 |
| |
| if num_windows <= 0: |
| return [] |
| |
| |
| max_len = max(len(row) for row in src_list) |
| |
| |
| global_min = min(min(row) for row in src_list if row) |
| fill_value = global_min - 1 |
|
|
| |
| A = np.full((n, max_len), fill_value, dtype=np.int64) |
| for i, row in enumerate(src_list): |
| L = len(row) |
| A[i, :L] = row |
|
|
| |
| |
| unique_targets = set(trg_list) |
| |
| present_dict = {} |
| |
| for t in unique_targets: |
| |
| present_dict[t] = np.any(A == t, axis=1) |
| |
| |
| |
| B = np.empty((k, num_windows), dtype=bool) |
| |
| for j in range(k): |
| t = trg_list[j] |
| |
| B[j, :] = present_dict[t][j: j + num_windows] |
| |
| |
| valid_windows_mask = np.all(B, axis=0) |
| valid_starts = np.nonzero(valid_windows_mask)[0] |
| |
| |
| result = [list(range(s, s + k)) for s in valid_starts] |
| |
| return result |
|
|
|
|
| |
|
|
| def pack_sequences(train_data, pad_val=-1): |
| """ |
| Packs a list of variable-length token sequences into a 2D CuPy array. |
| |
| This version computes lengths and builds the padded array and mask entirely on GPU. |
| It converts each sequence into a CuPy array, concatenates them, and assigns tokens in one shot. |
| |
| Returns: |
| batch: a CuPy array of shape (n, max_len) |
| lengths: a CuPy array of shape (n,) containing each sequence's length. |
| """ |
| n = len(train_data) |
| |
| lengths = cp.array([len(seq) for seq in train_data], dtype=cp.int64) |
| max_len_val = int(cp.max(lengths).get()) |
| |
| batch = cp.full((n, max_len_val), pad_val, dtype=cp.int64) |
| |
| mask = cp.arange(max_len_val).reshape(1, max_len_val) < lengths.reshape(n, 1) |
| |
| sequences = [cp.array(seq, dtype=cp.int64) for seq in train_data] |
| flat = cp.concatenate(sequences) |
| |
| batch[mask] = flat |
| return batch, lengths |
|
|
| |
|
|
| def count_best_pair_gpu(batch, lengths, factor, pad_val=-1): |
| """ |
| Given the entire GPU-resident packed data, compute the most frequent |
| adjacent pair (encoded as: pair_val = first * factor + second) on GPU. |
| """ |
| n, L = batch.shape |
| cols = cp.arange(L - 1, dtype=cp.int64) |
| cols_expanded = cp.broadcast_to(cols, (n, L - 1)) |
| valid_mask = cols_expanded < cp.reshape(lengths, (n, 1)) - 1 |
|
|
| first_tokens = batch[:, :L - 1] |
| second_tokens = batch[:, 1:L] |
| valid_first = first_tokens[valid_mask] |
| valid_second = second_tokens[valid_mask] |
|
|
| pairs = valid_first * factor + valid_second |
| if pairs.size == 0: |
| return None |
|
|
| sorted_pairs = cp.sort(pairs) |
| diff = cp.diff(sorted_pairs) |
| boundaries = cp.nonzero(diff)[0] + 1 |
| group_starts = cp.concatenate([cp.array([0], dtype=cp.int64), boundaries]) |
| group_ends = cp.concatenate([boundaries, cp.array([sorted_pairs.size], dtype=cp.int64)]) |
| group_counts = group_ends - group_starts |
|
|
| max_idx = int(cp.argmax(group_counts)) |
| best_pair_enc = int(sorted_pairs[group_starts[max_idx]]) |
| best_freq = int(group_counts[max_idx]) |
| first = best_pair_enc // factor |
| second = best_pair_enc % factor |
| return (first, second, best_freq) |
|
|
| |
|
|
| merge_kernel_code = r''' |
| extern "C" __global__ |
| void merge_pair_kernel(const long* input, long* output, |
| const long* input_lengths, long* output_lengths, |
| const long num_rows, const long num_cols, |
| const long a, const long b, const long new_token, |
| const long pad_val) { |
| int row = blockIdx.x * blockDim.x + threadIdx.x; |
| if (row >= num_rows) return; |
| long in_length = input_lengths[row]; |
| long out_idx = 0; |
| bool skip_next = false; |
| for (long i = 0; i < in_length; i++) { |
| if (skip_next) { |
| skip_next = false; |
| continue; |
| } |
| long token = input[row * num_cols + i]; |
| if (i < in_length - 1 && token == a && input[row * num_cols + i + 1] == b) { |
| output[row * num_cols + out_idx] = new_token; |
| out_idx++; |
| skip_next = true; |
| } else { |
| output[row * num_cols + out_idx] = token; |
| out_idx++; |
| } |
| } |
| output_lengths[row] = out_idx; |
| for (long j = out_idx; j < num_cols; j++) { |
| output[row * num_cols + j] = pad_val; |
| } |
| } |
| ''' |
| merge_kernel = cp.RawKernel(merge_kernel_code, 'merge_pair_kernel') |
|
|
| |
|
|
| def learn_bpe_codes_gpu(train_data, vocab_size=4096, max_merges=None, pad_val=-1): |
| """ |
| Learn BPE merge rules completely on GPU. |
| |
| The training data is packed once (using the vectorized pack_sequences). |
| On each merge iteration, the best adjacent pair is computed on GPU and then merged |
| into a new token via a custom merge kernel (with double-buffering). |
| |
| Returns: |
| codes: a list of merge rules as ((first, second), new_token) |
| final_data: the merged training data (list of sequences) |
| """ |
| |
| batch, lengths = pack_sequences(train_data, pad_val) |
| n, L = batch.shape |
|
|
| |
| initial_vocab = {token for seq in train_data for token in seq} |
| next_token = max(initial_vocab) + 1 |
| codes = [] |
| merge_count = 0 |
| pbar = tqdm.tqdm(total=max_merges if max_merges is not None else None, |
| desc="Learning BPE Codes (GPU)", leave=True) |
|
|
| |
| work_batch = cp.empty_like(batch) |
| work_lengths = cp.empty_like(lengths) |
| input_batch = batch |
| input_lengths = lengths |
|
|
| threads_per_block = 128 |
| blocks = (n + threads_per_block - 1) // threads_per_block |
|
|
| while next_token < vocab_size and (max_merges is None or merge_count < max_merges): |
| |
| if bool(cp.all(input_lengths == 1)): |
| pbar.write("All sequences have collapsed; stopping early.") |
| break |
|
|
| factor = next_token |
| best = count_best_pair_gpu(input_batch, input_lengths, factor, pad_val) |
| if best is None: |
| pbar.write("No mergeable pairs found; stopping early.") |
| break |
| |
| best_pair = (best[0], best[1]) |
| best_freq = best[2] |
| if best_freq < 2: |
| pbar.write("Best pair frequency is less than 2; stopping early.") |
| break |
|
|
| codes.append((best_pair, next_token)) |
|
|
| |
| merge_kernel((blocks,), (threads_per_block,), |
| (input_batch, |
| work_batch, |
| input_lengths, |
| work_lengths, |
| cp.int64(n), |
| cp.int64(L), |
| cp.int64(best_pair[0]), |
| cp.int64(best_pair[1]), |
| cp.int64(next_token), |
| cp.int64(pad_val))) |
| |
| input_batch, work_batch = work_batch, input_batch |
| input_lengths, work_lengths = work_lengths, input_lengths |
|
|
| next_token += 1 |
| merge_count += 1 |
| pbar.update(1) |
| pbar.close() |
|
|
| final_batch = cp.asnumpy(input_batch) |
| final_lengths = cp.asnumpy(input_lengths) |
| final_data = [final_batch[i, :final_lengths[i]].tolist() for i in range(n)] |
| return codes, final_data |
|
|
| |
|
|
| fused_merge_kernel_code = r''' |
| extern "C" __global__ |
| void fused_merge_kernel(long* data_in, long* data_out, long* lengths, const long pad_val, |
| const long num_rows, const long max_len, const long num_merges, const long* merge_rules) { |
| int row = blockIdx.x * blockDim.x + threadIdx.x; |
| if (row >= num_rows) return; |
| long base = row * max_len; |
| long cur_len = lengths[row]; |
| long* cur = data_in + base; |
| long* other = data_out + base; |
| // Process each merge rule sequentially. |
| for (int m = 0; m < num_merges; m++) { |
| long a = merge_rules[3 * m]; |
| long b = merge_rules[3 * m + 1]; |
| long new_token = merge_rules[3 * m + 2]; |
| long out_idx = 0; |
| for (int i = 0; i < cur_len; i++) { |
| if (i < cur_len - 1 && cur[i] == a && cur[i+1] == b) { |
| other[out_idx] = new_token; |
| out_idx++; |
| i++; // Skip the next token. |
| } else { |
| other[out_idx] = cur[i]; |
| out_idx++; |
| } |
| } |
| cur_len = out_idx; |
| // Swap pointers for the next merge. |
| long* temp = cur; |
| cur = other; |
| other = temp; |
| } |
| lengths[row] = cur_len; |
| // Pad the remaining positions with pad_val. |
| for (int i = cur_len; i < max_len; i++) { |
| cur[i] = pad_val; |
| } |
| // If the final result is not in data_in, copy back. |
| if (cur != data_in + base) { |
| for (int i = 0; i < cur_len; i++) { |
| data_in[base + i] = cur[i]; |
| } |
| } |
| } |
| ''' |
| fused_kernel = cp.RawKernel(fused_merge_kernel_code, 'fused_merge_kernel') |
|
|
| |
|
|
| def retokenize_train_data_fused_gpu(train_data, codes, pad_val=-1): |
| """ |
| Retokenize training data using the fully fused GPU kernel. |
| |
| The entire training dataset is first packed into GPU memory (using pack_sequences). |
| All learned merge rules (provided in 'codes') are applied via a single kernel launch. |
| Each GPU thread processes one sequence by applying all merge rules sequentially. |
| |
| Returns: |
| tokenized_data: list of retokenized sequences. |
| """ |
| |
| batch, lengths = pack_sequences(train_data, pad_val) |
| n, max_len = batch.shape |
| |
| if len(codes) > 0: |
| merge_rules_list = [[rule[0][0], rule[0][1], rule[1]] for rule in codes] |
| merge_rules_gpu = cp.array(merge_rules_list, dtype=cp.int64) |
| merge_rules_gpu = merge_rules_gpu.reshape(-1) |
| else: |
| merge_rules_gpu = cp.empty((0,), dtype=cp.int64) |
| num_merges = merge_rules_gpu.shape[0] // 3 |
| |
| scratch = cp.empty_like(batch) |
| threads_per_block = 128 |
| blocks = (n + threads_per_block - 1) // threads_per_block |
| |
| fused_kernel((blocks,), (threads_per_block,), |
| (batch, scratch, lengths, cp.int64(pad_val), |
| cp.int64(n), cp.int64(max_len), cp.int64(num_merges), merge_rules_gpu)) |
| final_batch = cp.asnumpy(batch) |
| final_lengths = cp.asnumpy(lengths) |
| tokenized_data = [final_batch[i, :final_lengths[i]].tolist() for i in range(n)] |
| return tokenized_data |
|
|
| |
|
|
| def bpe_encode(seq, codes): |
| """ |
| Iteratively encodes a sequence using BPE merge rules provided in a dictionary. |
| |
| Args: |
| seq (list): A list of tokens (e.g. integers) representing the input sequence. |
| codes (dict): A dictionary mapping token pairs (a tuple of two tokens) |
| to a merged token. For example: |
| { (1, 2): 100, (100, 3): 101 } |
| |
| Returns: |
| list: The encoded sequence after applying all possible merges. |
| |
| The function repeatedly scans the entire sequence from left to right; |
| whenever it finds a contiguous token pair that exists as a key in the |
| codes dict, it replaces that pair with the merged token. This pass is |
| repeated until no more merges are possible. |
| """ |
|
|
| if type(codes) == list: |
| codes = dict(codes) |
| |
| encoded_seq = seq.copy() |
| done = False |
| while not done: |
| new_seq = [] |
| i = 0 |
| changed = False |
| while i < len(encoded_seq): |
| |
| if i < len(encoded_seq) - 1 and (encoded_seq[i], encoded_seq[i + 1]) in codes: |
| new_seq.append(codes[(encoded_seq[i], encoded_seq[i + 1])]) |
| i += 2 |
| changed = True |
| else: |
| new_seq.append(encoded_seq[i]) |
| i += 1 |
| |
| if not changed: |
| done = True |
| encoded_seq = new_seq |
| return encoded_seq |
|
|
| |
|
|
| def bpe_decode(seq, codes): |
| """ |
| Decodes a sequence encoded with BPE merge rules defined in a codes dictionary. |
| |
| Args: |
| seq (list): The encoded sequence (a list of tokens). |
| codes (dict): A dictionary mapping token pairs to the merged token, used during encoding. |
| |
| Returns: |
| list: The fully decoded sequence, with all merged tokens recursively expanded. |
| |
| The function constructs a reverse mapping that converts a merged token back into |
| its constituent pair. Each token in the sequence is then recursively expanded. |
| """ |
|
|
| if type(codes) == list: |
| codes = dict(codes) |
| |
| |
| reverse_mapping = {merged: pair for pair, merged in codes.items()} |
|
|
| def recursive_expand(token): |
| |
| if token in reverse_mapping: |
| a, b = reverse_mapping[token] |
| return recursive_expand(a) + recursive_expand(b) |
| else: |
| return [token] |
|
|
| decoded_seq = [] |
| for token in seq: |
| decoded_seq.extend(recursive_expand(token)) |
| return decoded_seq |
|
|
| |
|
|
| def ensure_triplet(val: Any, name: str = "") -> Tuple[float, float, float]: |
| """ |
| Ensure the given parameter is returned as a triplet. |
| If provided as a scalar, promote it to a triplet. |
| """ |
| if np.isscalar(val): |
| return (float(val), float(val), float(val)) |
| elif isinstance(val, (list, tuple)) and len(val) == 3: |
| return tuple(float(x) for x in val) |
| else: |
| raise ValueError(f"{name} must be a scalar or a sequence of 3 numbers.") |
|
|
| |
|
|
| REP_PENALTY = ensure_triplet(REPETITION_PENALTY, "REPETITION_PENALTY") |
| SPIKE_STRENGTH = ensure_triplet(SPIKE_PENALTY_STRENGTH, "SPIKE_PENALTY_STRENGTH") |
| SPIKE_SIG = ensure_triplet(SPIKE_SIGMA, "SPIKE_SIGMA") |
|
|
| |
|
|
| def sliding_window_view_alternative(a: np.ndarray, window_length: int) -> np.ndarray: |
| """ |
| Create a sliding-window view (without copying) of an array. |
| Expected input shape: (n, L, d) and returns: (n, L - window_length + 1, window_length, d) |
| """ |
| n, L, d = a.shape |
| new_shape = (n, L - window_length + 1, window_length, d) |
| new_strides = (a.strides[0], a.strides[1], a.strides[1], a.strides[2]) |
| return np.lib.stride_tricks.as_strided(a, shape=new_shape, strides=new_strides) |
|
|
| |
|
|
| def build_ngram_mapping(data: np.ndarray, memory_len: int) -> Dict[Any, Dict[Any, int]]: |
| """ |
| Build an n-gram mapping from a context (a sequence of triplets) to candidate triplets with frequencies. |
| """ |
| n, L, d = data.shape |
| window_length = memory_len + 1 |
| windows = sliding_window_view_alternative(data, window_length) |
| |
|
|
| |
| contexts = windows[:, :, :memory_len, :] |
| candidates = windows[:, :, memory_len, :] |
|
|
| |
| contexts_flat = contexts.reshape(-1, memory_len, d) |
| candidates_flat = candidates.reshape(-1, d) |
|
|
| mapping = defaultdict(lambda: defaultdict(int)) |
| total_windows = contexts_flat.shape[0] |
| for context_arr, candidate_arr in tqdm.tqdm( |
| zip(contexts_flat, candidates_flat), |
| total=total_windows, |
| desc="Building n-gram mapping"): |
| context_key = tuple(map(tuple, context_arr)) |
| candidate_val = tuple(candidate_arr) |
| mapping[context_key][candidate_val] += 1 |
|
|
| return {context: dict(candidates) for context, candidates in mapping.items()} |
|
|
| |
|
|
| def precompute_mapping_lookup(mapping: Dict[Any, Dict[Any, int]]) -> Dict[Any, Tuple[Tuple[Any, ...], np.ndarray]]: |
| """ |
| Converts the mapping into a lookup table: context -> (tuple(candidates), frequencies_array). |
| """ |
| mapping_lookup = {} |
| for context, candidate_dict in tqdm.tqdm(mapping.items(), desc="Precomputing lookup"): |
| candidates = tuple(candidate_dict.keys()) |
| frequencies = np.array(list(candidate_dict.values()), dtype=np.float64) |
| mapping_lookup[context] = (candidates, frequencies) |
| return mapping_lookup |
|
|
| |
|
|
| def build_training_sequences_set(data: np.ndarray) -> set: |
| """ |
| Build a set of training sequences (each as a tuple of triplets) for uniqueness checking. |
| """ |
| return {tuple(map(tuple, seq)) for seq in data} |
|
|
| |
|
|
| def generate_sequence_optimized(mapping_lookup: Dict[Any, Tuple[Tuple[Any, ...], np.ndarray]], |
| training_set: set, |
| memory_len: int, |
| sequence_length: int = 24, |
| max_attempts: int = 1000) -> Optional[Tuple[Tuple[float, float, float], ...]]: |
| """ |
| Autoregressively generate a new, unique sequence using the precomputed mapping lookup. |
| The invariant maintained is: the second element of one triplet is never greater than the first element |
| of the following triplet. |
| |
| Two dynamic adjustments are applied for candidate selection: |
| |
| 1. **Dynamic Repetition Penalty:** |
| For each candidate, count the occurrences of each element in the generated sequence. |
| Rather than a fixed penalty, this repetition penalty scales with the ratio |
| (current_length / sequence_length). In log-space, it subtracts: |
| (current_length / sequence_length) * sum_k(count[k] * log(REP_PENALTY[k]) |
| 2. **Dynamic Spike (Variance) Penalty:** |
| For each candidate, compute the squared difference from the running average for each element. |
| Use a dynamic sigma that is the maximum between the running standard deviation and the baseline. |
| The penalty term for each element is: |
| SPIKE_STRENGTH[k] * ((cand[k] - running_avg[k])^2) / (2 * dynamic_sigma[k]^2) |
| The overall spike penalty is the sum of the three terms and is subtracted from the candidate’s log frequency. |
| |
| The resulting candidate log score is computed as: |
| log(candidate_frequency) - rep_penalty_component - spike_penalty_component |
| A numerical stable softmax is then applied over these scores to determine the probability for drawing a candidate. |
| |
| If no candidate passing the invariant is found, the attempt is aborted. |
| |
| Parameters: |
| mapping_lookup: Precomputed lookup mapping (context → (candidates, frequencies)). |
| training_set: Set of training sequences to ensure uniqueness. |
| memory_len: Number of triplets used as context. |
| sequence_length: Desired length of the generated sequence. |
| max_attempts: Maximum number of generation attempts. |
| |
| Returns: |
| A new unique sequence (tuple of triplets) that respects the invariant, or None if not found. |
| """ |
| mapping_keys = list(mapping_lookup.keys()) |
| num_keys = len(mapping_keys) |
|
|
| for attempt in range(max_attempts): |
| |
| seed = mapping_keys[np.random.randint(0, num_keys)] |
| generated_sequence: List[Tuple[float, float, float]] = list(seed) |
| valid_generation = True |
|
|
| while len(generated_sequence) < sequence_length: |
| last_triplet = generated_sequence[-1] |
| current_context = tuple(generated_sequence[-memory_len:]) |
| candidate_found = False |
|
|
| if current_context in mapping_lookup: |
| candidates, frequencies = mapping_lookup[current_context] |
| |
| |
| valid_indices = [i for i, cand in enumerate(candidates) if cand[0] >= last_triplet[1]] |
| if valid_indices: |
| |
| filtered_freqs = frequencies[valid_indices] |
| filtered_candidates = [candidates[i] for i in valid_indices] |
|
|
| |
| candidate_array = np.array(filtered_candidates, dtype=np.float64) |
| |
| |
| generated_array = np.array(generated_sequence, dtype=np.float64) |
| current_length = generated_array.shape[0] |
| |
| |
| running_avg = np.mean(generated_array, axis=0) |
| running_std = np.std(generated_array, axis=0) |
| |
| dynamic_sigma = np.maximum(running_std, np.array(SPIKE_SIG)) |
| |
| |
| |
| rep_counts = np.array([ |
| [np.sum(generated_array[:, k] == candidate_array[i, k]) for k in range(3)] |
| for i in range(candidate_array.shape[0]) |
| ]) |
| |
| rep_penalty_term = np.sum(rep_counts * np.log(np.array(REP_PENALTY)) * |
| (current_length / sequence_length), axis=1) |
|
|
| |
| |
| diff = candidate_array - running_avg |
| spike_penalty_term = np.sum(np.array(SPIKE_STRENGTH) * (diff**2) / (2 * (dynamic_sigma**2)), |
| axis=1) |
|
|
| |
| |
| log_freq = np.log(filtered_freqs) |
| log_scores = log_freq - rep_penalty_term - spike_penalty_term |
|
|
| |
| max_log = np.max(log_scores) |
| exp_scores = np.exp(log_scores - max_log) |
| probabilities = exp_scores / np.sum(exp_scores) |
| |
| |
| chosen_idx = np.random.choice(len(filtered_candidates), p=probabilities) |
| next_triplet = filtered_candidates[chosen_idx] |
| candidate_found = True |
|
|
| if not candidate_found: |
| |
| valid_generation = False |
| break |
|
|
| generated_sequence.append(next_triplet) |
|
|
| |
| if valid_generation and len(generated_sequence) == sequence_length: |
| new_sequence = tuple(generated_sequence) |
| invariant_ok = all(a[1] <= b[0] for a, b in zip(new_sequence, new_sequence[1:])) |
| if invariant_ok and new_sequence not in training_set: |
| return new_sequence |
|
|
| return None |
|
|
| |
|
|
| def analyze_generated_sequence(sequence: tuple, mapping_lookup: dict, memory_len: int) -> tuple: |
| """ |
| Analyze the generated sequence and return several useful statistics |
| as both a dictionary and as a nicely formatted string report. |
| |
| Statistics Computed: |
| - unigram_diversity: Ratio of unique triplets to total triplets. |
| - repetition_rate: Fraction of repeated triplets. |
| - bigram_diversity: Ratio of unique consecutive pairs to total pairs. |
| - max_consecutive_repetitions: Maximum number of identical consecutive triplets. |
| - avg_candidate_probability (overfit rate): For the transitions (using a sliding window of size |
| MEMORY_LEN as context followed by candidate), the average probability of the chosen candidate |
| as per the training mapping. |
| |
| Additional Analytics: |
| - element_stats: For each element (index 0, 1, 2) in a triplet, includes: |
| * mean, standard deviation, minimum, maximum, and average consecutive absolute difference. |
| - avg_transition_entropy: The average entropy of the candidate distributions (from mapping_lookup) |
| for each transition context. |
| - context_coverage: The fraction of transitions (based on context of length MEMORY_LEN) that are found |
| in the mapping_lookup. |
| |
| Parameters: |
| sequence: Generated sequence (tuple of triplets). |
| mapping_lookup: Precomputed mapping lookup. |
| memory_len: The context length used. |
| |
| Returns: |
| A tuple containing: |
| (stats_dict, stats_report_string) |
| """ |
| stats = {} |
| seq_len = len(sequence) |
| |
| |
| |
| |
| unique_triplets = len(set(sequence)) |
| stats["unigram_diversity"] = unique_triplets / seq_len |
| stats["repetition_rate"] = 1 - (unique_triplets / seq_len) |
| |
| |
| bigrams = [(sequence[i], sequence[i+1]) for i in range(seq_len - 1)] |
| unique_bigrams = len(set(bigrams)) |
| stats["bigram_diversity"] = unique_bigrams / (seq_len - 1) |
| |
| |
| max_consecutive = 1 |
| current_consecutive = 1 |
| for i in range(1, seq_len): |
| if sequence[i] == sequence[i-1]: |
| current_consecutive += 1 |
| if current_consecutive > max_consecutive: |
| max_consecutive = current_consecutive |
| else: |
| current_consecutive = 1 |
| stats["max_consecutive_repetitions"] = max_consecutive |
|
|
| |
| overfit_probs = [] |
| for i in range(memory_len, seq_len): |
| context = tuple(sequence[i - memory_len: i]) |
| candidate = sequence[i] |
| if context in mapping_lookup: |
| candidates, frequencies = mapping_lookup[context] |
| total_freq = np.sum(frequencies) |
| try: |
| idx = candidates.index(candidate) |
| cand_prob = frequencies[idx] / total_freq |
| overfit_probs.append(cand_prob) |
| except ValueError: |
| pass |
| stats["avg_candidate_probability"] = np.mean(overfit_probs) if overfit_probs else None |
|
|
| |
|
|
| |
| seq_arr = np.array(sequence) |
| element_stats = {} |
| for dim in range(seq_arr.shape[1]): |
| values = seq_arr[:, dim] |
| mean_val = np.mean(values) |
| std_val = np.std(values) |
| min_val = np.min(values) |
| max_val = np.max(values) |
| |
| diffs = np.abs(np.diff(values)) |
| avg_diff = np.mean(diffs) if diffs.size > 0 else 0 |
| element_stats[f"element_{dim}"] = { |
| "mean": mean_val, |
| "std": std_val, |
| "min": min_val, |
| "max": max_val, |
| "avg_consecutive_diff": avg_diff, |
| } |
| stats["element_stats"] = element_stats |
|
|
| |
| entropies = [] |
| valid_transitions = 0 |
| for i in range(memory_len, seq_len): |
| context = tuple(sequence[i - memory_len: i]) |
| if context in mapping_lookup: |
| candidates, freqs = mapping_lookup[context] |
| total_freq = np.sum(freqs) |
| if total_freq > 0: |
| probs = freqs / total_freq |
| |
| epsilon = 1e-10 |
| entropy = -np.sum(probs * np.log(probs + epsilon)) |
| entropies.append(entropy) |
| valid_transitions += 1 |
| stats["avg_transition_entropy"] = np.mean(entropies) if entropies else None |
|
|
| |
| total_transitions = seq_len - memory_len |
| stats["context_coverage"] = (valid_transitions / total_transitions) if total_transitions > 0 else None |
|
|
| |
| sep_line = "-" * 60 |
| lines = [] |
| lines.append(sep_line) |
| lines.append("Sequence Analytics Report:") |
| lines.append(sep_line) |
| lines.append("Overall Statistics:") |
| lines.append(f" Unigram Diversity : {stats['unigram_diversity']:.3f}") |
| lines.append(f" Repetition Rate : {stats['repetition_rate']:.3f}") |
| lines.append(f" Bigram Diversity : {stats['bigram_diversity']:.3f}") |
| lines.append(f" Max Consecutive Repetitions: {stats['max_consecutive_repetitions']}") |
| cand_prob = stats["avg_candidate_probability"] |
| cand_prob_str = f"{cand_prob:.3f}" if cand_prob is not None else "N/A" |
| lines.append(f" Avg Candidate Probability : {cand_prob_str}") |
| lines.append("") |
| |
| lines.append("Element-Level Statistics:") |
| for dim in sorted(element_stats.keys()): |
| ed = element_stats[dim] |
| lines.append(f" {dim.capitalize()}:") |
| lines.append(f" Mean : {ed['mean']:.3f}") |
| lines.append(f" Std Dev : {ed['std']:.3f}") |
| lines.append(f" Min : {ed['min']:.3f}") |
| lines.append(f" Max : {ed['max']:.3f}") |
| lines.append(f" Avg Consecutive Diff : {ed['avg_consecutive_diff']:.3f}") |
| lines.append("") |
|
|
| lines.append("Transition Statistics:") |
| avg_entropy = stats["avg_transition_entropy"] |
| entropy_str = f"{avg_entropy:.3f}" if avg_entropy is not None else "N/A" |
| lines.append(f" Average Transition Entropy: {entropy_str}") |
| cc = stats["context_coverage"] |
| cc_str = f"{cc:.3f}" if cc is not None else "N/A" |
| lines.append(f" Context Coverage : {cc_str}") |
| lines.append(sep_line) |
| |
| stats_report = "\n".join(lines) |
| |
| |
| return stats, stats_report |
|
|
| |
|
|
| def autoregressive_generate(start_seq, mel_tones, trg_array, trg_matches_array, num_new_tokens, chunk_len=5): |
| |
| |
| current_seq = np.array(start_seq, dtype=int) |
| trg_array = np.array(trg_array, dtype=int) |
| start_len = len(start_seq) |
|
|
| midx = start_len-1 |
| |
| |
| recent_candidates = deque(maxlen=5) |
|
|
| while (len(current_seq) - start_len) < num_new_tokens: |
|
|
| midx += 1 |
|
|
| |
| context = current_seq[-(chunk_len-1):] |
|
|
| sli = 0 |
| msize = 0 |
|
|
| ctx = context[:, :-1].reshape(1, -1) |
| trg_mat_arr = trg_matches_array |
|
|
| while msize < 8: |
|
|
| print('=== Slice', sli) |
| |
| |
| match_mask = np.all(ctx == trg_mat_arr, axis=1) |
| match_indices = np.where(match_mask)[0] |
|
|
| msize = match_indices.size |
|
|
| if msize < 8: |
| sli += 1 |
| ctx = context[:, :-1].reshape(1, -1)[:, sli:] |
| trg_mat_arr = trg_matches_array[:, :-sli] |
| |
| if match_indices.size == 0: |
| if len(current_seq) > start_len: |
|
|
| |
| tones_chord = sorted([mel_tones[midx]]) |
| new_tuple = [[mel_tones[midx], TMIDIX.ALL_CHORDS_SORTED.index(tones_chord)]] |
| current_seq = np.concatenate((current_seq, new_tuple), axis=0) |
| print('Subbed', midx) |
| continue |
|
|
| |
| available_candidates = [] |
| cseen = [] |
| for idx in match_indices: |
|
|
| if idx not in recent_candidates: |
| |
| candidate_pair = tuple(trg_array[idx].tolist()) |
| if candidate_pair[-1][0] == mel_tones[midx] and candidate_pair[-1][1] not in cseen: |
| available_candidates.append((idx, candidate_pair)) |
| cseen.append(candidate_pair[-1][1]) |
|
|
| |
| if len(available_candidates) < 3: |
| if len(current_seq) >= start_len: |
| |
| tones_chord = sorted([mel_tones[midx]]) |
| new_tuple = [[mel_tones[midx], TMIDIX.ALL_CHORDS_SORTED.index(tones_chord)]] |
| current_seq = np.concatenate((current_seq, new_tuple), axis=0) |
| |
| |
| |
| |
| |
| continue |
|
|
| else: |
| print(len(available_candidates)) |
| |
| chosen_idx, chosen_pair = available_candidates[np.random.choice(len(available_candidates))] |
| new_token = trg_array[chosen_idx][-1] |
| |
| |
| |
| current_seq = np.concatenate((current_seq, new_token[None, :]), axis=0) |
| |
| recent_candidates.append(chosen_idx) |
|
|
| print('Gen seq len', len(current_seq)) |
|
|
| return current_seq |
|
|
| |
|
|
| def minkowski_distance_vector_to_matrix(x: cp.ndarray, X: cp.ndarray, p: float = 3) -> cp.ndarray: |
| |
| """ |
| Computes the Minkowski distance between a 1D CuPy array 'x' and each row of a 2D CuPy array 'X'. |
| |
| Parameters: |
| x (cp.ndarray): A 1D array with shape (n_features,) representing a single vector. |
| X (cp.ndarray): A 2D array with shape (n_samples, n_features) where each row is a vector. |
| p (float): The order of the Minkowski distance. |
| For instance: |
| - p=1 yields the Manhattan distance, |
| - p=2 yields the Euclidean distance, |
| - p=3 yields the Minkowski distance and will use the cube-root implementation, |
| - p=∞ (or cp.inf) gives the Chebyshev distance. |
| |
| Returns: |
| cp.ndarray: A 1D array of length n_samples containing the Minkowski distance between 'x' |
| and the corresponding row in 'X'. |
| """ |
|
|
| |
| |
| diff = cp.abs(X - x) |
| |
| if p == float('inf') or p == cp.inf: |
| |
| distances = cp.max(diff, axis=1) |
| elif p == 3: |
| |
| |
| distances = cp.cbrt(cp.sum(diff ** 3, axis=1)) |
| else: |
| |
| |
| distances = cp.sum(diff ** p, axis=1) ** (1.0 / p) |
| |
| return distances |
|
|
| |
|
|
| def pairwise_minkowski_distance(X: cp.ndarray, p: float = 2) -> cp.ndarray: |
| |
| """ |
| Computes pairwise Minkowski distances for a 2D CuPy array. |
| |
| Parameters: |
| X (cp.ndarray): A 2D array of shape (n_samples, n_features), where each row represents a vector. |
| p (float): The order of the Minkowski distance. |
| For example: |
| - p=1 is the Manhattan distance, |
| - p=2 is the Euclidean distance, |
| - p=∞ (e.g., float('inf') or cp.inf) is the Chebyshev distance. |
| |
| Returns: |
| cp.ndarray: A 2D array of shape (n_samples, n_samples) containing the pairwise Minkowski distances. |
| """ |
| |
| |
| |
| if p == float('inf') or p == cp.inf: |
| |
| return cp.max(cp.abs(X[:, None, :] - X[None, :, :]), axis=-1) |
| else: |
| |
| diff_powered = cp.abs(X[:, None, :] - X[None, :, :]) ** p |
| |
| distances = cp.sum(diff_powered, axis=-1) ** (1.0 / p) |
| |
| return distances |
| |
| |
|
|
| def pairwise_cosine_similarity(X: cp.ndarray, eps: float = 1e-10) -> cp.ndarray: |
| |
| """ |
| Computes the pairwise cosine similarity for a 2D CuPy array. |
| |
| Parameters: |
| X (cp.ndarray): A 2D array of shape (n_samples, n_features) where each row represents a vector. |
| eps (float): A small constant added to the denominator to prevent division by zero. |
| |
| Returns: |
| cp.ndarray: A 2D array of shape (n_samples, n_samples) containing the pairwise cosine similarities. |
| """ |
| |
| |
| |
| dot_product = cp.dot(X, X.T) |
| |
| |
| norms = cp.linalg.norm(X, axis=1) |
| |
| |
| |
| norm_matrix = cp.outer(norms, norms) |
| |
| |
| |
| cosine_similarity = dot_product / (norm_matrix + eps) |
| |
| return cosine_similarity |
|
|
| |
|
|
| print('Module is loaded!') |
| print('Enjoy! :)') |
| print('=' * 70) |
|
|
| |
| |
| |