from typing import Union import numpy as np import pandas as pd from loguru import logger from ..util.misc import ( flatten_column_names, log_df_info, optimize_dtypes, set_resampling_freq, ) def calculate_ticks_per_period( df: pd.DataFrame, timeframe: str = "M1", method: str = "median", verbose: bool = True, ) -> int: """ Compute the number of ticks per period for dynamic bar sizing using either mean or median. Args: df (pd.DataFrame): Tick data with a datetime index. timeframe (str): Timeframe using MetaTrader5 convention (e.g., 'M1'). method (str): Calculation method from ['median', 'mean'] verbose (bool): Whether to log the result. Returns: int: Rounded number of ticks per period. """ freq = set_resampling_freq(timeframe) resampled = ( df.resample(freq).size().values ) # Count all rows, not just non-NaN values fn = getattr(np, method) # function used for getting ticks in period num_ticks = fn(resampled) num_rounded = int(round(num_ticks)) # Round dynamically based on magnitude num_digits = len(str(num_rounded)) - 1 rounded_ticks = int(round(num_rounded, -num_digits)) rounded_ticks = max(10, rounded_ticks) # Make 10 ticks the minimum bar size if verbose: t0, t1 = (x.date() for x in df.index[[0, -1]]) logger.info( f"{method.title()} {timeframe} ticks = {num_rounded:,} -> " f"{rounded_ticks:,} ({t0} to {t1})" ) return rounded_ticks def _make_bar_type_grouper( df: pd.DataFrame, bar_type: str = "tick", bar_size: Union[int, str] = 100, ) -> tuple[pd.DataFrame.groupby, int]: """ Create a grouped object for aggregating tick data into time/tick/dollar/volume bars. Args: df: DataFrame with tick data (index should be datetime for time bars). bar_type: Type of bar ('time', 'tick', 'dollar', 'volume'). bar_size: - Timeframe for resampling (e.g., 'H1', 'D1', 'W1') for time bars. - Number of ticks/dollars/volume per bar (ignored for time bars). Returns: - GroupBy object for aggregation - Calculated bar_size (for tick/dollar/volume bars) - Bar ids """ df = df.copy(deep=False) # Ensure DatetimeIndex if not isinstance(df.index, pd.DatetimeIndex): try: df.set_index("time", inplace=True) except KeyError as e: raise TypeError("Could not set 'time' as index") from e # Sort if needed if not df.index.is_monotonic_increasing: df.sort_index(inplace=True) # Time bars if bar_type == "time": freq = set_resampling_freq(bar_size) bar_group = ( df.resample(freq, closed="left", label="right") if not freq.startswith(("B", "W")) else df.resample(freq) ) return bar_group, bar_size, None # Dynamic bar sizing if bar_type == "tick" and isinstance(bar_size, str): bar_size = calculate_ticks_per_period(df, bar_size) if not isinstance(bar_size, int): raise NotImplementedError( f"{bar_type} bars require integer bar_size, but you input '{bar_size}'" ) elif bar_size == 0: raise NotImplementedError(f"{bar_type} bars require non-zero bar_size") # Non-time bars df["time"] = df.index # Add without copying if bar_type == "tick": bar_id = np.arange(len(df)) // bar_size elif bar_type in ("volume", "dollar"): if "volume" not in df.columns: raise KeyError(f"'volume' column required for {bar_type} bars") # Optimized cumulative sum cum_metric = df["volume"] * df["bid"] if bar_type == "dollar" else df["volume"] cumsum = cum_metric.cumsum() bar_id = (cumsum // bar_size).astype(int) else: raise NotImplementedError(f"{bar_type} bars not implemented") return df.groupby(bar_id), bar_size, bar_id def make_bars( tick_df: pd.DataFrame, bar_type: str = "tick", bar_size: Union[int, str] = 100, price: str = "mid_price", tick_num: bool = True, verbose: bool = False, ): """ Constructs OHLC bars from tick data. Args: tick_df (pd.DataFrame): Tick data. bar_type (str): Bar type ('tick', 'time', 'volume', 'dollar'). bar_size (int | str): For non-time bars; if str, dynamic calculation is used. timeframe (str): Timeframe for calculation. price (str): Price field strategy ('bid', 'ask', 'mid_price', 'bid_ask'). tick_num (bool): Add column with index of which tick where each bar was formed if True. verbose (bool): Prints runtime details if True. Returns: pd.DataFrame: OHLC bars with additional metrics. """ if tick_df.empty: logger.warning("Empty tick_df passed to make_bars. Returning empty DataFrame.") return pd.DataFrame() tick_df = tick_df.copy() tick_df.rename( columns={ "Price": "price", "Volume": "volume", "Bid": "bid", "Ask": "ask", }, inplace=True, ) if {"bid", "ask"}.issubset(tick_df.columns): tick_df["mid_price"] = (tick_df["bid"] + tick_df["ask"]) / 2 elif "price" in tick_df.columns: tick_df["mid_price"] = tick_df["price"] tick_df["bid"] = tick_df["price"] tick_df["ask"] = tick_df["price"] elif "mid_price" not in tick_df.columns: raise KeyError("Tick data must contain either bid/ask, price, or mid_price columns") if "spread" not in tick_df.columns: tick_df["spread"] = tick_df["ask"] - tick_df["bid"] tick_df["spread_bps"] = tick_df["spread"] / tick_df["mid_price"] * 10000 price_cols = ["bid", "ask"] if price == "bid_ask" else [price] price_cols += ["spread", "spread_bps"] if bar_type in ("volume", "dollar") and "volume" not in tick_df: raise KeyError(f"'volume' column required for {bar_type} bars") if "volume" in tick_df: price_cols.append("volume") bar_group, bar_size, bar_id = _make_bar_type_grouper( tick_df[price_cols], bar_type, bar_size ) if price != "bid_ask": ohlc_df = bar_group[price].ohlc() else: ohlc_df = bar_group.agg({k: "ohlc" for k in ("bid", "ask")}) ohlc_df = flatten_column_names(ohlc_df) # Make OHLC using mid-price for col in ["open", "high", "low", "close"]: ohlc_df[col] = ohlc_df.filter(regex=col).sum(axis=1).div(2) ohlc_df["spread"] = bar_group["spread"].mean() ohlc_df["spread_bps"] = bar_group["spread_bps"].mean() ohlc_df["tick_volume"] = bar_group.size() if bar_type != "tick" else bar_size if "volume" in tick_df.columns: ohlc_df["volume"] = bar_group["volume"].sum() if bar_type == "time": eq_zero = ohlc_df["tick_volume"] == 0 ohlc_df = ohlc_df[~eq_zero] nzeros = eq_zero.sum() if nzeros > 0: nrows = ohlc_df.shape[0] msg = f"{nzeros:,} of {nrows:,} ({nzeros / nrows:.2%}) rows with zero tick volume." logger.info(f"Dropped {msg}") if tick_num: ohlc_df["tick_num"] = ohlc_df["tick_volume"].cumsum() # 1-based index else: ohlc_df.index = bar_group["time"].last() + pd.Timedelta( microseconds=1 ) # Ensure end time is after last tick if len(tick_df) % bar_size > 0: ohlc_df = ohlc_df.iloc[:-1] if tick_num: ohlc_df["tick_num"] = _get_bar_tick_indices(tick_df, bar_size, bar_id) try: ohlc_df = ohlc_df.tz_convert(None) # Remove timezone information from index except TypeError: logger.warning( "The tick data used to construct 'ohlc_df' lacks timezone information; skipping tz conversion. \ Ensure source data is timezone-aware to avoid downstream ambiguity." ) ohlc_df = optimize_dtypes(ohlc_df) # Save memory if verbose: bar_info = ( f"{bar_type}-{bar_size:,}" if (bar_type != "time") else f"{bar_size.upper()}" ) logger.info(f"{bar_info} bars contain {ohlc_df.shape[0]:,} rows.") logger.info(f"Tick data contains {tick_df.shape[0]:,} rows.") log_df_info(ohlc_df) return ohlc_df def _get_bar_tick_indices(tick_df, bar_size, bar_id) -> pd.Series: """ Return the tick indices that form each bar. Parameters ---------- tick_df : pd.DataFrame Tick data with datetime index (or 'time' column). bar_type : str, default 'tick' Bar type ('tick', 'time', 'volume', 'dollar'). bar_size : int or str, default 100 Bar size. If str and bar_type='tick', dynamic calculation is used. Returns ------- pd.Series Series indexed by bar end time with tick number on which bar was formed """ n_ticks = len(tick_df) # Find where bar_id changes (new bar starts) # diff > 0 indicates a bar boundary diff = np.diff(bar_id, prepend=-1) boundary_indices = np.where(diff > 0)[0] # Last tick indices are one before each boundary last_indices = boundary_indices - 1 # Add final bar if complete if n_ticks % bar_size == 0 and n_ticks > 0: last_indices = np.append(last_indices, n_ticks - 1) # Filter valid indices and set to 1-based index last_indices = last_indices[last_indices >= 0] + 1 return last_indices