File size: 12,138 Bytes
669d6a1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
"""
Optimized implementation of logic regarding sequential bootstrapping from chapter 4.
"""

import numpy as np
from numba import njit


def get_ind_matrix(samples_info_sets, price_bars_index):
    """
    Build an indicator mapping from each sample to the bar indices it influences.

    Args:
        samples_info_sets (pd.Series):
            Triple-barrier events (t1) returned by labeling.get_events.
            Index: start times (t0) as pd.DatetimeIndex.
            Values: end times (t1) as pd.Timestamp (or NaT for open events).
        price_bars_index (pd.DatetimeIndex or array-like):
            Sorted bar timestamps (pd.DatetimeIndex or array-like).

    Returns:
        np.array: Indicator matrix
    """
    m = sum(
        (price_bars_index >= samples_info_sets.index.min())
        & (price_bars_index <= samples_info_sets.max())
    )
    n = len(samples_info_sets)
    ind_mat = np.zeros((m, n), dtype=np.int8)

    # precompute searchsorted positions to restrict scanning range
    starts = np.searchsorted(price_bars_index, samples_info_sets.index, side="left")
    ends = np.searchsorted(
        price_bars_index, samples_info_sets.values, side="right"
    )  # exclusive

    for sample_id in range(n):
        s = starts[sample_id]
        e = ends[sample_id]
        if e > s:
            ind_mat[s:e, sample_id] = 1

    return ind_mat


def get_active_indices(samples_info_sets, price_bars_index):
    """
    Build an indicator mapping from each sample to the bar indices it influences.

    Args:
        samples_info_sets (pd.Series):
            Triple-barrier events (t1) returned by labeling.get_events.
            Index: start times (t0) as pd.DatetimeIndex.
            Values: end times (t1) as pd.Timestamp (or NaT for open events).
        price_bars_index (pd.DatetimeIndex or array-like):
            Sorted bar timestamps (pd.DatetimeIndex or array-like).

    Returns:
        dict:
            Standard Python dictionary mapping sample_id (int) to a numpy.ndarray of
            bar indices (dtype=int64). Example: {0: array([0,1,2], dtype=int64), 1: array([], dtype=int64), ...}
    """
    n = len(samples_info_sets)
    active_indices = {}

    # precompute searchsorted positions to restrict scanning range
    starts = np.searchsorted(price_bars_index, samples_info_sets.index, side="left")
    ends = np.searchsorted(
        price_bars_index, samples_info_sets.values, side="right"
    )  # exclusive

    for sample_id in range(n):
        s = starts[sample_id]
        e = ends[sample_id]
        if e > s:
            active_indices[sample_id] = np.arange(s, e, dtype=int)
        else:
            active_indices[sample_id] = np.empty(0, dtype=int)

    return active_indices


def get_ind_mat_average_uniqueness(ind_mat):
    """
    Advances in Financial Machine Learning, Snippet 4.4. page 65.

    Compute Average Uniqueness

    Average uniqueness from indicator matrix

    :param ind_mat: (np.matrix) Indicator binary matrix
    :return: (float) Average uniqueness
    """
    ind_mat = np.array(ind_mat, dtype=np.float64)
    concurrency = ind_mat.sum(axis=1)
    uniqueness = np.divide(
        ind_mat.T, concurrency, out=np.zeros_like(ind_mat.T), where=concurrency != 0
    )

    avg_uniqueness = uniqueness[uniqueness > 0].mean()

    return avg_uniqueness


# -----------------
# Packing helpers
# -----------------
def pack_active_indices(active_indices):
    """
    Convert dict/list-of-arrays active_indices into flattened arrays and offsets.

    Args:
        active_indices (dict or list): mapping sample_id -> 1D ndarray of bar indices

    Returns:
        flat_indices (ndarray int64): concatenated bar indices for all samples
        offsets (ndarray int64): start index in flat_indices for each sample (len = n+1)
        lengths (ndarray int64): number of indices per sample (len = n)
        sample_ids (list): list of sample ids in the order used to pack data
    """
    # Preserve sample id ordering to allow mapping between chosen index and original id
    if isinstance(active_indices, dict):
        sample_ids = list(active_indices.keys())
        values = [active_indices[sid] for sid in sample_ids]
    else:
        # assume list-like ordered by sample id 0..n-1
        sample_ids = list(range(len(active_indices)))
        values = list(active_indices)

    lengths = np.array([v.size for v in values], dtype=np.int64)
    offsets = np.empty(len(values) + 1, dtype=np.int64)
    offsets[0] = 0
    offsets[1:] = np.cumsum(lengths)

    total = int(offsets[-1])
    if total == 0:
        flat_indices = np.empty(0, dtype=np.int64)
    else:
        flat_indices = np.empty(total, dtype=np.int64)
        pos = 0
        for v in values:
            l = v.size
            if l:
                flat_indices[pos : pos + l] = v
            pos += l

    return flat_indices, offsets, lengths, sample_ids


# ------------------------------
# Numba-accelerated primitives
# ------------------------------
@njit(cache=True)
def _compute_scores_flat(flat_indices, offsets, lengths, concurrency):
    """
    Compute average uniqueness for each sample using flattened indices.

    This follows de Prado's approach: for each bar in a sample, compute uniqueness as 1/(c+1),
    then average across all bars in that sample.

    Args:
        flat_indices (ndarray int64): concatenated indices
        offsets (ndarray int64): start positions (len = n+1)
        lengths (ndarray int64): counts per sample
        concurrency (ndarray int64): current concurrency counts per bar

    Returns:
        scores (ndarray float64): average uniqueness per sample
    """
    n = offsets.shape[0] - 1
    scores = np.empty(n, dtype=np.float64)

    for i in range(n):
        s = offsets[i]
        e = offsets[i + 1]
        length = lengths[i]

        if length == 0:
            # If a sample covers no bars, assign zero average uniqueness
            scores[i] = 0.0
        else:
            # Compute uniqueness = 1/(c+1) for each bar, then average
            sum_uniqueness = 0.0
            for k in range(s, e):
                bar = flat_indices[k]
                c = concurrency[bar]
                uniqueness = 1.0 / (c + 1.0)
                sum_uniqueness += uniqueness
            avg_uniqueness = sum_uniqueness / length
            scores[i] = avg_uniqueness

    return scores


@njit(cache=True)
def _normalize_to_prob(scores):
    """
    Normalize non-negative scores to a probability vector. If all zero, return uniform.
    """
    n = scores.shape[0]
    total = 0.0
    for i in range(n):
        total += scores[i]

    prob = np.empty(n, dtype=np.float64)
    if total == 0.0:
        # fallback to uniform distribution
        uni = 1.0 / n
        for i in range(n):
            prob[i] = uni
    else:
        for i in range(n):
            prob[i] = scores[i] / total
    return prob


@njit(cache=True)
def _choose_index_from_cdf(prob, u):
    """
    Convert a uniform random number u in [0,1) to an index using the cumulative distribution.

    This avoids calling numpy.choice inside numba and is efficient.
    """
    n = prob.shape[0]
    cum = 0.0
    for i in range(n):
        cum += prob[i]
        if u < cum:
            return i
    # numerical fallback: return last index
    return n - 1


@njit(cache=True)
def _increment_concurrency_flat(flat_indices, offsets, chosen, concurrency):
    """
    Increment concurrency for the bars covered by sample `chosen`.
    """
    s = offsets[chosen]
    e = offsets[chosen + 1]
    for k in range(s, e):
        bar = flat_indices[k]
        concurrency[bar] += 1


# ------------------------------------------------------------------
# Fully njitted sampling procedure (no Python loops per-iteration)
# ------------------------------------------------------------------
@njit(cache=True)
def _seq_bootstrap_loop(flat_indices, offsets, lengths, concurrency, uniforms):
    """
    Njitted sequential bootstrap loop.

    Args:
        flat_indices, offsets, lengths: flattened index layout
        concurrency (ndarray int64): initial concurrency vector (will be mutated)
        uniforms (ndarray float64): pre-drawn uniform random numbers in [0,1), length = sample_length

    Returns:
        chosen_indices (ndarray int64): sequence of chosen sample indices (positions in packed order)
    """
    sample_length = uniforms.shape[0]
    chosen_indices = np.empty(sample_length, dtype=np.int64)

    for it in range(sample_length):
        # compute scores and probabilities given current concurrency
        scores = _compute_scores_flat(flat_indices, offsets, lengths, concurrency)
        prob = _normalize_to_prob(scores)

        # map uniform to a sample index
        u = uniforms[it]
        idx = _choose_index_from_cdf(prob, u)
        chosen_indices[it] = idx

        # update concurrency for selected sample
        _increment_concurrency_flat(flat_indices, offsets, idx, concurrency)

    return chosen_indices


# -----------------------------------------------------------------------------------
# Public wrapper: reproducible, packs data, runs njit sampler, maps to original ids
# -----------------------------------------------------------------------------------
def seq_bootstrap(active_indices, sample_length=None, random_seed=None):
    """
    End-to-end sequential bootstrap using flattened arrays + Numba.

    Implements the sequential bootstrap as described in de Prado's "Advances in Financial
    Machine Learning" Chapter 4: average uniqueness per sample where uniqueness per bar
    is 1/(concurrency+1).

    Args:
        active_indices (dict or list): mapping sample id -> ndarray of bar indices
        sample_length (int or None): requested number of draws; defaults to number of samples
        random_seed (int, RandomState, or None): seed controlling the pre-drawn uniforms

    Returns:
        phi (list): list of chosen original sample ids (length = sample_length)
    """
    # Pack into contiguous arrays and keep mapping from packed index -> original sample id
    flat_indices, offsets, lengths, sample_ids = pack_active_indices(active_indices)
    n_samples = offsets.shape[0] - 1

    if sample_length is None:
        sample_length = n_samples

    # Concurrency vector length: bars are indices into price-bar positions.
    # When there are no bars (flat_indices empty), create an empty concurrency of length 0.
    if flat_indices.size == 0:
        T = 0
    else:
        # max bar index + 1 (bars are zero-based indices)
        T = int(flat_indices.max()) + 1

    concurrency = np.zeros(T, dtype=np.int64)

    # Prepare reproducible uniforms. Accept either integer seed or RandomState.
    if random_seed is None:
        rng = np.random.RandomState()
    elif isinstance(random_seed, np.random.RandomState):
        rng = random_seed
    else:
        try:
            rng = np.random.RandomState(int(random_seed))
        except (ValueError, TypeError):
            rng = np.random.RandomState()

    # Pre-draw uniforms in Python and pass them into njit function (numba cannot accept RandomState)
    uniforms = rng.random_sample(sample_length).astype(np.float64)

    # Run njit loop (this mutates concurrency but we don't need concurrency afterwards)
    chosen_packed = _seq_bootstrap_loop(
        flat_indices, offsets, lengths, concurrency, uniforms
    )

    # Map packed indices back to original sample ids
    phi = [sample_ids[int(i)] for i in chosen_packed.tolist()]

    return phi


# --------------------------------------
# Example usage (for quick manual test)
# --------------------------------------
if __name__ == "__main__":
    # small synthetic test: 4 samples with overlapping bar ranges
    # sample 0 covers bars [0,1], sample 1 covers [1,2], sample 2 covers [2,3], sample 3 covers []
    active = {
        0: np.array([0, 1], dtype=np.int64),
        1: np.array([1, 2], dtype=np.int64),
        2: np.array([2, 3], dtype=np.int64),
        3: np.empty(0, dtype=np.int64),
    }

    phi = seq_bootstrap(active, sample_length=6, random_seed=42)
    print("phi:", phi)