AFML / afml /sampling /bootstrapping.py
akshayboora's picture
Upload 940 files
669d6a1 verified
"""
Optimized implementation of logic regarding sequential bootstrapping from chapter 4.
"""
import numpy as np
from numba import njit
def get_ind_matrix(samples_info_sets, price_bars_index):
"""
Build an indicator mapping from each sample to the bar indices it influences.
Args:
samples_info_sets (pd.Series):
Triple-barrier events (t1) returned by labeling.get_events.
Index: start times (t0) as pd.DatetimeIndex.
Values: end times (t1) as pd.Timestamp (or NaT for open events).
price_bars_index (pd.DatetimeIndex or array-like):
Sorted bar timestamps (pd.DatetimeIndex or array-like).
Returns:
np.array: Indicator matrix
"""
m = sum(
(price_bars_index >= samples_info_sets.index.min())
& (price_bars_index <= samples_info_sets.max())
)
n = len(samples_info_sets)
ind_mat = np.zeros((m, n), dtype=np.int8)
# precompute searchsorted positions to restrict scanning range
starts = np.searchsorted(price_bars_index, samples_info_sets.index, side="left")
ends = np.searchsorted(
price_bars_index, samples_info_sets.values, side="right"
) # exclusive
for sample_id in range(n):
s = starts[sample_id]
e = ends[sample_id]
if e > s:
ind_mat[s:e, sample_id] = 1
return ind_mat
def get_active_indices(samples_info_sets, price_bars_index):
"""
Build an indicator mapping from each sample to the bar indices it influences.
Args:
samples_info_sets (pd.Series):
Triple-barrier events (t1) returned by labeling.get_events.
Index: start times (t0) as pd.DatetimeIndex.
Values: end times (t1) as pd.Timestamp (or NaT for open events).
price_bars_index (pd.DatetimeIndex or array-like):
Sorted bar timestamps (pd.DatetimeIndex or array-like).
Returns:
dict:
Standard Python dictionary mapping sample_id (int) to a numpy.ndarray of
bar indices (dtype=int64). Example: {0: array([0,1,2], dtype=int64), 1: array([], dtype=int64), ...}
"""
n = len(samples_info_sets)
active_indices = {}
# precompute searchsorted positions to restrict scanning range
starts = np.searchsorted(price_bars_index, samples_info_sets.index, side="left")
ends = np.searchsorted(
price_bars_index, samples_info_sets.values, side="right"
) # exclusive
for sample_id in range(n):
s = starts[sample_id]
e = ends[sample_id]
if e > s:
active_indices[sample_id] = np.arange(s, e, dtype=int)
else:
active_indices[sample_id] = np.empty(0, dtype=int)
return active_indices
def get_ind_mat_average_uniqueness(ind_mat):
"""
Advances in Financial Machine Learning, Snippet 4.4. page 65.
Compute Average Uniqueness
Average uniqueness from indicator matrix
:param ind_mat: (np.matrix) Indicator binary matrix
:return: (float) Average uniqueness
"""
ind_mat = np.array(ind_mat, dtype=np.float64)
concurrency = ind_mat.sum(axis=1)
uniqueness = np.divide(
ind_mat.T, concurrency, out=np.zeros_like(ind_mat.T), where=concurrency != 0
)
avg_uniqueness = uniqueness[uniqueness > 0].mean()
return avg_uniqueness
# -----------------
# Packing helpers
# -----------------
def pack_active_indices(active_indices):
"""
Convert dict/list-of-arrays active_indices into flattened arrays and offsets.
Args:
active_indices (dict or list): mapping sample_id -> 1D ndarray of bar indices
Returns:
flat_indices (ndarray int64): concatenated bar indices for all samples
offsets (ndarray int64): start index in flat_indices for each sample (len = n+1)
lengths (ndarray int64): number of indices per sample (len = n)
sample_ids (list): list of sample ids in the order used to pack data
"""
# Preserve sample id ordering to allow mapping between chosen index and original id
if isinstance(active_indices, dict):
sample_ids = list(active_indices.keys())
values = [active_indices[sid] for sid in sample_ids]
else:
# assume list-like ordered by sample id 0..n-1
sample_ids = list(range(len(active_indices)))
values = list(active_indices)
lengths = np.array([v.size for v in values], dtype=np.int64)
offsets = np.empty(len(values) + 1, dtype=np.int64)
offsets[0] = 0
offsets[1:] = np.cumsum(lengths)
total = int(offsets[-1])
if total == 0:
flat_indices = np.empty(0, dtype=np.int64)
else:
flat_indices = np.empty(total, dtype=np.int64)
pos = 0
for v in values:
l = v.size
if l:
flat_indices[pos : pos + l] = v
pos += l
return flat_indices, offsets, lengths, sample_ids
# ------------------------------
# Numba-accelerated primitives
# ------------------------------
@njit(cache=True)
def _compute_scores_flat(flat_indices, offsets, lengths, concurrency):
"""
Compute average uniqueness for each sample using flattened indices.
This follows de Prado's approach: for each bar in a sample, compute uniqueness as 1/(c+1),
then average across all bars in that sample.
Args:
flat_indices (ndarray int64): concatenated indices
offsets (ndarray int64): start positions (len = n+1)
lengths (ndarray int64): counts per sample
concurrency (ndarray int64): current concurrency counts per bar
Returns:
scores (ndarray float64): average uniqueness per sample
"""
n = offsets.shape[0] - 1
scores = np.empty(n, dtype=np.float64)
for i in range(n):
s = offsets[i]
e = offsets[i + 1]
length = lengths[i]
if length == 0:
# If a sample covers no bars, assign zero average uniqueness
scores[i] = 0.0
else:
# Compute uniqueness = 1/(c+1) for each bar, then average
sum_uniqueness = 0.0
for k in range(s, e):
bar = flat_indices[k]
c = concurrency[bar]
uniqueness = 1.0 / (c + 1.0)
sum_uniqueness += uniqueness
avg_uniqueness = sum_uniqueness / length
scores[i] = avg_uniqueness
return scores
@njit(cache=True)
def _normalize_to_prob(scores):
"""
Normalize non-negative scores to a probability vector. If all zero, return uniform.
"""
n = scores.shape[0]
total = 0.0
for i in range(n):
total += scores[i]
prob = np.empty(n, dtype=np.float64)
if total == 0.0:
# fallback to uniform distribution
uni = 1.0 / n
for i in range(n):
prob[i] = uni
else:
for i in range(n):
prob[i] = scores[i] / total
return prob
@njit(cache=True)
def _choose_index_from_cdf(prob, u):
"""
Convert a uniform random number u in [0,1) to an index using the cumulative distribution.
This avoids calling numpy.choice inside numba and is efficient.
"""
n = prob.shape[0]
cum = 0.0
for i in range(n):
cum += prob[i]
if u < cum:
return i
# numerical fallback: return last index
return n - 1
@njit(cache=True)
def _increment_concurrency_flat(flat_indices, offsets, chosen, concurrency):
"""
Increment concurrency for the bars covered by sample `chosen`.
"""
s = offsets[chosen]
e = offsets[chosen + 1]
for k in range(s, e):
bar = flat_indices[k]
concurrency[bar] += 1
# ------------------------------------------------------------------
# Fully njitted sampling procedure (no Python loops per-iteration)
# ------------------------------------------------------------------
@njit(cache=True)
def _seq_bootstrap_loop(flat_indices, offsets, lengths, concurrency, uniforms):
"""
Njitted sequential bootstrap loop.
Args:
flat_indices, offsets, lengths: flattened index layout
concurrency (ndarray int64): initial concurrency vector (will be mutated)
uniforms (ndarray float64): pre-drawn uniform random numbers in [0,1), length = sample_length
Returns:
chosen_indices (ndarray int64): sequence of chosen sample indices (positions in packed order)
"""
sample_length = uniforms.shape[0]
chosen_indices = np.empty(sample_length, dtype=np.int64)
for it in range(sample_length):
# compute scores and probabilities given current concurrency
scores = _compute_scores_flat(flat_indices, offsets, lengths, concurrency)
prob = _normalize_to_prob(scores)
# map uniform to a sample index
u = uniforms[it]
idx = _choose_index_from_cdf(prob, u)
chosen_indices[it] = idx
# update concurrency for selected sample
_increment_concurrency_flat(flat_indices, offsets, idx, concurrency)
return chosen_indices
# -----------------------------------------------------------------------------------
# Public wrapper: reproducible, packs data, runs njit sampler, maps to original ids
# -----------------------------------------------------------------------------------
def seq_bootstrap(active_indices, sample_length=None, random_seed=None):
"""
End-to-end sequential bootstrap using flattened arrays + Numba.
Implements the sequential bootstrap as described in de Prado's "Advances in Financial
Machine Learning" Chapter 4: average uniqueness per sample where uniqueness per bar
is 1/(concurrency+1).
Args:
active_indices (dict or list): mapping sample id -> ndarray of bar indices
sample_length (int or None): requested number of draws; defaults to number of samples
random_seed (int, RandomState, or None): seed controlling the pre-drawn uniforms
Returns:
phi (list): list of chosen original sample ids (length = sample_length)
"""
# Pack into contiguous arrays and keep mapping from packed index -> original sample id
flat_indices, offsets, lengths, sample_ids = pack_active_indices(active_indices)
n_samples = offsets.shape[0] - 1
if sample_length is None:
sample_length = n_samples
# Concurrency vector length: bars are indices into price-bar positions.
# When there are no bars (flat_indices empty), create an empty concurrency of length 0.
if flat_indices.size == 0:
T = 0
else:
# max bar index + 1 (bars are zero-based indices)
T = int(flat_indices.max()) + 1
concurrency = np.zeros(T, dtype=np.int64)
# Prepare reproducible uniforms. Accept either integer seed or RandomState.
if random_seed is None:
rng = np.random.RandomState()
elif isinstance(random_seed, np.random.RandomState):
rng = random_seed
else:
try:
rng = np.random.RandomState(int(random_seed))
except (ValueError, TypeError):
rng = np.random.RandomState()
# Pre-draw uniforms in Python and pass them into njit function (numba cannot accept RandomState)
uniforms = rng.random_sample(sample_length).astype(np.float64)
# Run njit loop (this mutates concurrency but we don't need concurrency afterwards)
chosen_packed = _seq_bootstrap_loop(
flat_indices, offsets, lengths, concurrency, uniforms
)
# Map packed indices back to original sample ids
phi = [sample_ids[int(i)] for i in chosen_packed.tolist()]
return phi
# --------------------------------------
# Example usage (for quick manual test)
# --------------------------------------
if __name__ == "__main__":
# small synthetic test: 4 samples with overlapping bar ranges
# sample 0 covers bars [0,1], sample 1 covers [1,2], sample 2 covers [2,3], sample 3 covers []
active = {
0: np.array([0, 1], dtype=np.int64),
1: np.array([1, 2], dtype=np.int64),
2: np.array([2, 3], dtype=np.int64),
3: np.empty(0, dtype=np.int64),
}
phi = seq_bootstrap(active, sample_length=6, random_seed=42)
print("phi:", phi)