Spaces:
No application file
No application file
| """ | |
| Logic regarding concurrent labels from chapter 4. | |
| """ | |
| import pandas as pd | |
| from afml.util.multiprocess import mp_pandas_obj | |
| def num_concurrent_events(close_series_index, label_endtime, molecule): | |
| """ | |
| Advances in Financial Machine Learning, Snippet 4.1, page 60. | |
| Estimating the Uniqueness of a Label | |
| This function uses close series prices and label endtime (when the first barrier is touched) | |
| to compute the number of concurrent events per bar. | |
| :param close_series_index: (pd.Series) Close prices index | |
| :param label_endtime: (pd.Series) Label endtime series (t1 for triple barrier events) | |
| :param molecule: (an array) A set of datetime index values for processing | |
| :return: (pd.Series) Number concurrent labels for each datetime index | |
| """ | |
| # Find events that span the period [molecule[0], molecule[1]] | |
| label_endtime = label_endtime.fillna( | |
| close_series_index[-1] | |
| ) # Unclosed events still must impact other weights | |
| label_endtime = label_endtime[ | |
| label_endtime >= molecule[0] | |
| ] # Events that end at or after molecule[0] | |
| # Events that start at or before t1[molecule].max() | |
| label_endtime = label_endtime.loc[: label_endtime[molecule].max()] | |
| # Count events spanning a bar | |
| nearest_index = close_series_index.searchsorted( | |
| pd.DatetimeIndex([label_endtime.index[0], label_endtime.max()]) | |
| ) | |
| count = pd.Series( | |
| 0, index=close_series_index[nearest_index[0] : nearest_index[1] + 1] | |
| ) | |
| for t_in, t_out in label_endtime.items(): | |
| count.loc[t_in:t_out] += 1 | |
| return count.loc[molecule[0] : label_endtime[molecule].max()] | |
| def _get_average_uniqueness(label_endtime, num_conc_events, molecule): | |
| """ | |
| Advances in Financial Machine Learning, Snippet 4.2, page 62. | |
| Estimating the Average Uniqueness of a Label | |
| This function uses close series prices and label endtime (when the first barrier is touched) | |
| to compute the number of concurrent events per bar. | |
| :param label_endtime: (pd.Series) Label endtime series (t1 for triple barrier events) | |
| :param num_conc_events: (pd.Series) Number of concurrent labels (output from num_concurrent_events function). | |
| :param molecule: (an array) A set of datetime index values for processing. | |
| :return: (pd.Series) Average uniqueness over event's lifespan. | |
| """ | |
| wght = {} | |
| for t_in, t_out in label_endtime.loc[molecule].items(): | |
| wght[t_in] = (1.0 / num_conc_events.loc[t_in:t_out]).mean() | |
| wght = pd.Series(wght) | |
| return wght | |
| def get_num_conc_events(events, close, num_threads=4, verbose=True): | |
| num_conc_events = mp_pandas_obj( | |
| num_concurrent_events, | |
| ("molecule", events.index), | |
| num_threads, | |
| close_series_index=close.index, | |
| label_endtime=events["t1"], | |
| verbose=verbose, | |
| ) | |
| return num_conc_events | |
| def get_av_uniqueness_from_triple_barrier( | |
| triple_barrier_events, close_series, num_threads, num_conc_events=None, verbose=True | |
| ): | |
| """ | |
| This function is the orchestrator to derive average sample uniqueness from a dataset labeled by the triple barrier | |
| method. | |
| :param triple_barrier_events: (pd.DataFrame) Events from labeling.get_events() | |
| :param close_series: (pd.Series) Close prices. | |
| :param num_threads: (int) The number of threads concurrently used by the function. | |
| :param num_conc_events: (pd.Series) Number concurrent labels for each datetime index | |
| :param verbose: (bool) Flag to report progress on asynch jobs | |
| :return: (pd.Series) Average uniqueness over event's lifespan for each index in triple_barrier_events | |
| """ | |
| out = pd.DataFrame() | |
| # Create processing pipeline for num_conc_events | |
| def process_concurrent_events(ce): | |
| """Process concurrent events to ensure proper format and indexing.""" | |
| ce = ce.loc[~ce.index.duplicated(keep="last")] | |
| ce = ce.reindex(close_series.index).fillna(0) | |
| if isinstance(ce, pd.Series): | |
| ce = ce.to_frame() | |
| return ce | |
| # Handle num_conc_events (whether provided or computed) | |
| if num_conc_events is None: | |
| num_conc_events = get_num_conc_events( | |
| triple_barrier_events, close_series, num_threads, verbose | |
| ) | |
| processed_ce = process_concurrent_events(num_conc_events) | |
| else: | |
| # Ensure precomputed value matches expected format | |
| processed_ce = process_concurrent_events(num_conc_events.copy()) | |
| # Verify index compatibility | |
| missing_in_close = processed_ce.index.difference(close_series.index) | |
| assert missing_in_close.empty, ( | |
| f"num_conc_events contains {len(missing_in_close)} indices not in close_series" | |
| ) | |
| out["tW"] = mp_pandas_obj( | |
| _get_average_uniqueness, | |
| ("molecule", triple_barrier_events.index), | |
| num_threads, | |
| label_endtime=triple_barrier_events["t1"], | |
| num_conc_events=processed_ce, | |
| verbose=verbose, | |
| ) | |
| return out | |