Spaces:
No application file
No application file
| """ | |
| Optimized implementation of logic regarding sequential bootstrapping from chapter 4. | |
| """ | |
| import numpy as np | |
| from numba import njit | |
| def get_ind_matrix(samples_info_sets, price_bars_index): | |
| """ | |
| Build an indicator mapping from each sample to the bar indices it influences. | |
| Args: | |
| samples_info_sets (pd.Series): | |
| Triple-barrier events (t1) returned by labeling.get_events. | |
| Index: start times (t0) as pd.DatetimeIndex. | |
| Values: end times (t1) as pd.Timestamp (or NaT for open events). | |
| price_bars_index (pd.DatetimeIndex or array-like): | |
| Sorted bar timestamps (pd.DatetimeIndex or array-like). | |
| Returns: | |
| np.array: Indicator matrix | |
| """ | |
| m = sum( | |
| (price_bars_index >= samples_info_sets.index.min()) | |
| & (price_bars_index <= samples_info_sets.max()) | |
| ) | |
| n = len(samples_info_sets) | |
| ind_mat = np.zeros((m, n), dtype=np.int8) | |
| # precompute searchsorted positions to restrict scanning range | |
| starts = np.searchsorted(price_bars_index, samples_info_sets.index, side="left") | |
| ends = np.searchsorted( | |
| price_bars_index, samples_info_sets.values, side="right" | |
| ) # exclusive | |
| for sample_id in range(n): | |
| s = starts[sample_id] | |
| e = ends[sample_id] | |
| if e > s: | |
| ind_mat[s:e, sample_id] = 1 | |
| return ind_mat | |
| def get_active_indices(samples_info_sets, price_bars_index): | |
| """ | |
| Build an indicator mapping from each sample to the bar indices it influences. | |
| Args: | |
| samples_info_sets (pd.Series): | |
| Triple-barrier events (t1) returned by labeling.get_events. | |
| Index: start times (t0) as pd.DatetimeIndex. | |
| Values: end times (t1) as pd.Timestamp (or NaT for open events). | |
| price_bars_index (pd.DatetimeIndex or array-like): | |
| Sorted bar timestamps (pd.DatetimeIndex or array-like). | |
| Returns: | |
| dict: | |
| Standard Python dictionary mapping sample_id (int) to a numpy.ndarray of | |
| bar indices (dtype=int64). Example: {0: array([0,1,2], dtype=int64), 1: array([], dtype=int64), ...} | |
| """ | |
| n = len(samples_info_sets) | |
| active_indices = {} | |
| # precompute searchsorted positions to restrict scanning range | |
| starts = np.searchsorted(price_bars_index, samples_info_sets.index, side="left") | |
| ends = np.searchsorted( | |
| price_bars_index, samples_info_sets.values, side="right" | |
| ) # exclusive | |
| for sample_id in range(n): | |
| s = starts[sample_id] | |
| e = ends[sample_id] | |
| if e > s: | |
| active_indices[sample_id] = np.arange(s, e, dtype=int) | |
| else: | |
| active_indices[sample_id] = np.empty(0, dtype=int) | |
| return active_indices | |
| def get_ind_mat_average_uniqueness(ind_mat): | |
| """ | |
| Advances in Financial Machine Learning, Snippet 4.4. page 65. | |
| Compute Average Uniqueness | |
| Average uniqueness from indicator matrix | |
| :param ind_mat: (np.matrix) Indicator binary matrix | |
| :return: (float) Average uniqueness | |
| """ | |
| ind_mat = np.array(ind_mat, dtype=np.float64) | |
| concurrency = ind_mat.sum(axis=1) | |
| uniqueness = np.divide( | |
| ind_mat.T, concurrency, out=np.zeros_like(ind_mat.T), where=concurrency != 0 | |
| ) | |
| avg_uniqueness = uniqueness[uniqueness > 0].mean() | |
| return avg_uniqueness | |
| # ----------------- | |
| # Packing helpers | |
| # ----------------- | |
| def pack_active_indices(active_indices): | |
| """ | |
| Convert dict/list-of-arrays active_indices into flattened arrays and offsets. | |
| Args: | |
| active_indices (dict or list): mapping sample_id -> 1D ndarray of bar indices | |
| Returns: | |
| flat_indices (ndarray int64): concatenated bar indices for all samples | |
| offsets (ndarray int64): start index in flat_indices for each sample (len = n+1) | |
| lengths (ndarray int64): number of indices per sample (len = n) | |
| sample_ids (list): list of sample ids in the order used to pack data | |
| """ | |
| # Preserve sample id ordering to allow mapping between chosen index and original id | |
| if isinstance(active_indices, dict): | |
| sample_ids = list(active_indices.keys()) | |
| values = [active_indices[sid] for sid in sample_ids] | |
| else: | |
| # assume list-like ordered by sample id 0..n-1 | |
| sample_ids = list(range(len(active_indices))) | |
| values = list(active_indices) | |
| lengths = np.array([v.size for v in values], dtype=np.int64) | |
| offsets = np.empty(len(values) + 1, dtype=np.int64) | |
| offsets[0] = 0 | |
| offsets[1:] = np.cumsum(lengths) | |
| total = int(offsets[-1]) | |
| if total == 0: | |
| flat_indices = np.empty(0, dtype=np.int64) | |
| else: | |
| flat_indices = np.empty(total, dtype=np.int64) | |
| pos = 0 | |
| for v in values: | |
| l = v.size | |
| if l: | |
| flat_indices[pos : pos + l] = v | |
| pos += l | |
| return flat_indices, offsets, lengths, sample_ids | |
| # ------------------------------ | |
| # Numba-accelerated primitives | |
| # ------------------------------ | |
| def _compute_scores_flat(flat_indices, offsets, lengths, concurrency): | |
| """ | |
| Compute average uniqueness for each sample using flattened indices. | |
| This follows de Prado's approach: for each bar in a sample, compute uniqueness as 1/(c+1), | |
| then average across all bars in that sample. | |
| Args: | |
| flat_indices (ndarray int64): concatenated indices | |
| offsets (ndarray int64): start positions (len = n+1) | |
| lengths (ndarray int64): counts per sample | |
| concurrency (ndarray int64): current concurrency counts per bar | |
| Returns: | |
| scores (ndarray float64): average uniqueness per sample | |
| """ | |
| n = offsets.shape[0] - 1 | |
| scores = np.empty(n, dtype=np.float64) | |
| for i in range(n): | |
| s = offsets[i] | |
| e = offsets[i + 1] | |
| length = lengths[i] | |
| if length == 0: | |
| # If a sample covers no bars, assign zero average uniqueness | |
| scores[i] = 0.0 | |
| else: | |
| # Compute uniqueness = 1/(c+1) for each bar, then average | |
| sum_uniqueness = 0.0 | |
| for k in range(s, e): | |
| bar = flat_indices[k] | |
| c = concurrency[bar] | |
| uniqueness = 1.0 / (c + 1.0) | |
| sum_uniqueness += uniqueness | |
| avg_uniqueness = sum_uniqueness / length | |
| scores[i] = avg_uniqueness | |
| return scores | |
| def _normalize_to_prob(scores): | |
| """ | |
| Normalize non-negative scores to a probability vector. If all zero, return uniform. | |
| """ | |
| n = scores.shape[0] | |
| total = 0.0 | |
| for i in range(n): | |
| total += scores[i] | |
| prob = np.empty(n, dtype=np.float64) | |
| if total == 0.0: | |
| # fallback to uniform distribution | |
| uni = 1.0 / n | |
| for i in range(n): | |
| prob[i] = uni | |
| else: | |
| for i in range(n): | |
| prob[i] = scores[i] / total | |
| return prob | |
| def _choose_index_from_cdf(prob, u): | |
| """ | |
| Convert a uniform random number u in [0,1) to an index using the cumulative distribution. | |
| This avoids calling numpy.choice inside numba and is efficient. | |
| """ | |
| n = prob.shape[0] | |
| cum = 0.0 | |
| for i in range(n): | |
| cum += prob[i] | |
| if u < cum: | |
| return i | |
| # numerical fallback: return last index | |
| return n - 1 | |
| def _increment_concurrency_flat(flat_indices, offsets, chosen, concurrency): | |
| """ | |
| Increment concurrency for the bars covered by sample `chosen`. | |
| """ | |
| s = offsets[chosen] | |
| e = offsets[chosen + 1] | |
| for k in range(s, e): | |
| bar = flat_indices[k] | |
| concurrency[bar] += 1 | |
| # ------------------------------------------------------------------ | |
| # Fully njitted sampling procedure (no Python loops per-iteration) | |
| # ------------------------------------------------------------------ | |
| def _seq_bootstrap_loop(flat_indices, offsets, lengths, concurrency, uniforms): | |
| """ | |
| Njitted sequential bootstrap loop. | |
| Args: | |
| flat_indices, offsets, lengths: flattened index layout | |
| concurrency (ndarray int64): initial concurrency vector (will be mutated) | |
| uniforms (ndarray float64): pre-drawn uniform random numbers in [0,1), length = sample_length | |
| Returns: | |
| chosen_indices (ndarray int64): sequence of chosen sample indices (positions in packed order) | |
| """ | |
| sample_length = uniforms.shape[0] | |
| chosen_indices = np.empty(sample_length, dtype=np.int64) | |
| for it in range(sample_length): | |
| # compute scores and probabilities given current concurrency | |
| scores = _compute_scores_flat(flat_indices, offsets, lengths, concurrency) | |
| prob = _normalize_to_prob(scores) | |
| # map uniform to a sample index | |
| u = uniforms[it] | |
| idx = _choose_index_from_cdf(prob, u) | |
| chosen_indices[it] = idx | |
| # update concurrency for selected sample | |
| _increment_concurrency_flat(flat_indices, offsets, idx, concurrency) | |
| return chosen_indices | |
| # ----------------------------------------------------------------------------------- | |
| # Public wrapper: reproducible, packs data, runs njit sampler, maps to original ids | |
| # ----------------------------------------------------------------------------------- | |
| def seq_bootstrap(active_indices, sample_length=None, random_seed=None): | |
| """ | |
| End-to-end sequential bootstrap using flattened arrays + Numba. | |
| Implements the sequential bootstrap as described in de Prado's "Advances in Financial | |
| Machine Learning" Chapter 4: average uniqueness per sample where uniqueness per bar | |
| is 1/(concurrency+1). | |
| Args: | |
| active_indices (dict or list): mapping sample id -> ndarray of bar indices | |
| sample_length (int or None): requested number of draws; defaults to number of samples | |
| random_seed (int, RandomState, or None): seed controlling the pre-drawn uniforms | |
| Returns: | |
| phi (list): list of chosen original sample ids (length = sample_length) | |
| """ | |
| # Pack into contiguous arrays and keep mapping from packed index -> original sample id | |
| flat_indices, offsets, lengths, sample_ids = pack_active_indices(active_indices) | |
| n_samples = offsets.shape[0] - 1 | |
| if sample_length is None: | |
| sample_length = n_samples | |
| # Concurrency vector length: bars are indices into price-bar positions. | |
| # When there are no bars (flat_indices empty), create an empty concurrency of length 0. | |
| if flat_indices.size == 0: | |
| T = 0 | |
| else: | |
| # max bar index + 1 (bars are zero-based indices) | |
| T = int(flat_indices.max()) + 1 | |
| concurrency = np.zeros(T, dtype=np.int64) | |
| # Prepare reproducible uniforms. Accept either integer seed or RandomState. | |
| if random_seed is None: | |
| rng = np.random.RandomState() | |
| elif isinstance(random_seed, np.random.RandomState): | |
| rng = random_seed | |
| else: | |
| try: | |
| rng = np.random.RandomState(int(random_seed)) | |
| except (ValueError, TypeError): | |
| rng = np.random.RandomState() | |
| # Pre-draw uniforms in Python and pass them into njit function (numba cannot accept RandomState) | |
| uniforms = rng.random_sample(sample_length).astype(np.float64) | |
| # Run njit loop (this mutates concurrency but we don't need concurrency afterwards) | |
| chosen_packed = _seq_bootstrap_loop( | |
| flat_indices, offsets, lengths, concurrency, uniforms | |
| ) | |
| # Map packed indices back to original sample ids | |
| phi = [sample_ids[int(i)] for i in chosen_packed.tolist()] | |
| return phi | |
| # -------------------------------------- | |
| # Example usage (for quick manual test) | |
| # -------------------------------------- | |
| if __name__ == "__main__": | |
| # small synthetic test: 4 samples with overlapping bar ranges | |
| # sample 0 covers bars [0,1], sample 1 covers [1,2], sample 2 covers [2,3], sample 3 covers [] | |
| active = { | |
| 0: np.array([0, 1], dtype=np.int64), | |
| 1: np.array([1, 2], dtype=np.int64), | |
| 2: np.array([2, 3], dtype=np.int64), | |
| 3: np.empty(0, dtype=np.int64), | |
| } | |
| phi = seq_bootstrap(active, sample_length=6, random_seed=42) | |
| print("phi:", phi) | |