Spaces:
No application file
No application file
File size: 4,972 Bytes
669d6a1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | """
Logic regarding concurrent labels from chapter 4.
"""
import pandas as pd
from afml.util.multiprocess import mp_pandas_obj
def num_concurrent_events(close_series_index, label_endtime, molecule):
"""
Advances in Financial Machine Learning, Snippet 4.1, page 60.
Estimating the Uniqueness of a Label
This function uses close series prices and label endtime (when the first barrier is touched)
to compute the number of concurrent events per bar.
:param close_series_index: (pd.Series) Close prices index
:param label_endtime: (pd.Series) Label endtime series (t1 for triple barrier events)
:param molecule: (an array) A set of datetime index values for processing
:return: (pd.Series) Number concurrent labels for each datetime index
"""
# Find events that span the period [molecule[0], molecule[1]]
label_endtime = label_endtime.fillna(
close_series_index[-1]
) # Unclosed events still must impact other weights
label_endtime = label_endtime[
label_endtime >= molecule[0]
] # Events that end at or after molecule[0]
# Events that start at or before t1[molecule].max()
label_endtime = label_endtime.loc[: label_endtime[molecule].max()]
# Count events spanning a bar
nearest_index = close_series_index.searchsorted(
pd.DatetimeIndex([label_endtime.index[0], label_endtime.max()])
)
count = pd.Series(
0, index=close_series_index[nearest_index[0] : nearest_index[1] + 1]
)
for t_in, t_out in label_endtime.items():
count.loc[t_in:t_out] += 1
return count.loc[molecule[0] : label_endtime[molecule].max()]
def _get_average_uniqueness(label_endtime, num_conc_events, molecule):
"""
Advances in Financial Machine Learning, Snippet 4.2, page 62.
Estimating the Average Uniqueness of a Label
This function uses close series prices and label endtime (when the first barrier is touched)
to compute the number of concurrent events per bar.
:param label_endtime: (pd.Series) Label endtime series (t1 for triple barrier events)
:param num_conc_events: (pd.Series) Number of concurrent labels (output from num_concurrent_events function).
:param molecule: (an array) A set of datetime index values for processing.
:return: (pd.Series) Average uniqueness over event's lifespan.
"""
wght = {}
for t_in, t_out in label_endtime.loc[molecule].items():
wght[t_in] = (1.0 / num_conc_events.loc[t_in:t_out]).mean()
wght = pd.Series(wght)
return wght
def get_num_conc_events(events, close, num_threads=4, verbose=True):
num_conc_events = mp_pandas_obj(
num_concurrent_events,
("molecule", events.index),
num_threads,
close_series_index=close.index,
label_endtime=events["t1"],
verbose=verbose,
)
return num_conc_events
def get_av_uniqueness_from_triple_barrier(
triple_barrier_events, close_series, num_threads, num_conc_events=None, verbose=True
):
"""
This function is the orchestrator to derive average sample uniqueness from a dataset labeled by the triple barrier
method.
:param triple_barrier_events: (pd.DataFrame) Events from labeling.get_events()
:param close_series: (pd.Series) Close prices.
:param num_threads: (int) The number of threads concurrently used by the function.
:param num_conc_events: (pd.Series) Number concurrent labels for each datetime index
:param verbose: (bool) Flag to report progress on asynch jobs
:return: (pd.Series) Average uniqueness over event's lifespan for each index in triple_barrier_events
"""
out = pd.DataFrame()
# Create processing pipeline for num_conc_events
def process_concurrent_events(ce):
"""Process concurrent events to ensure proper format and indexing."""
ce = ce.loc[~ce.index.duplicated(keep="last")]
ce = ce.reindex(close_series.index).fillna(0)
if isinstance(ce, pd.Series):
ce = ce.to_frame()
return ce
# Handle num_conc_events (whether provided or computed)
if num_conc_events is None:
num_conc_events = get_num_conc_events(
triple_barrier_events, close_series, num_threads, verbose
)
processed_ce = process_concurrent_events(num_conc_events)
else:
# Ensure precomputed value matches expected format
processed_ce = process_concurrent_events(num_conc_events.copy())
# Verify index compatibility
missing_in_close = processed_ce.index.difference(close_series.index)
assert missing_in_close.empty, (
f"num_conc_events contains {len(missing_in_close)} indices not in close_series"
)
out["tW"] = mp_pandas_obj(
_get_average_uniqueness,
("molecule", triple_barrier_events.index),
num_threads,
label_endtime=triple_barrier_events["t1"],
num_conc_events=processed_ce,
verbose=verbose,
)
return out
|