File size: 9,560 Bytes
669d6a1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
from typing import Union

import numpy as np
import pandas as pd
from loguru import logger

from ..util.misc import (
    flatten_column_names,
    log_df_info,
    optimize_dtypes,
    set_resampling_freq,
)


def calculate_ticks_per_period(
    df: pd.DataFrame,
    timeframe: str = "M1",
    method: str = "median",
    verbose: bool = True,
) -> int:
    """
    Compute the number of ticks per period for dynamic bar sizing using either mean or median.

    Args:
        df (pd.DataFrame): Tick data with a datetime index.
        timeframe (str): Timeframe using MetaTrader5 convention (e.g., 'M1').
        method (str): Calculation method from ['median', 'mean']
        verbose (bool): Whether to log the result.

    Returns:
        int: Rounded number of ticks per period.
    """
    freq = set_resampling_freq(timeframe)
    resampled = (
        df.resample(freq).size().values
    )  # Count all rows, not just non-NaN values
    fn = getattr(np, method)  # function used for getting ticks in period
    num_ticks = fn(resampled)
    num_rounded = int(round(num_ticks))

    # Round dynamically based on magnitude
    num_digits = len(str(num_rounded)) - 1
    rounded_ticks = int(round(num_rounded, -num_digits))
    rounded_ticks = max(10, rounded_ticks)  # Make 10 ticks the minimum bar size

    if verbose:
        t0, t1 = (x.date() for x in df.index[[0, -1]])
        logger.info(
            f"{method.title()} {timeframe} ticks = {num_rounded:,} -> "
            f"{rounded_ticks:,} ({t0} to {t1})"
        )

    return rounded_ticks


def _make_bar_type_grouper(
    df: pd.DataFrame,
    bar_type: str = "tick",
    bar_size: Union[int, str] = 100,
) -> tuple[pd.DataFrame.groupby, int]:
    """
    Create a grouped object for aggregating tick data into time/tick/dollar/volume bars.

    Args:
        df: DataFrame with tick data (index should be datetime for time bars).
        bar_type: Type of bar ('time', 'tick', 'dollar', 'volume').
        bar_size:
            - Timeframe for resampling (e.g., 'H1', 'D1', 'W1') for time bars.
            - Number of ticks/dollars/volume per bar (ignored for time bars).

    Returns:
        - GroupBy object for aggregation
        - Calculated bar_size (for tick/dollar/volume bars)
        - Bar ids
    """
    df = df.copy(deep=False)

    # Ensure DatetimeIndex
    if not isinstance(df.index, pd.DatetimeIndex):
        try:
            df.set_index("time", inplace=True)
        except KeyError as e:
            raise TypeError("Could not set 'time' as index") from e

    # Sort if needed
    if not df.index.is_monotonic_increasing:
        df.sort_index(inplace=True)

    # Time bars
    if bar_type == "time":
        freq = set_resampling_freq(bar_size)
        bar_group = (
            df.resample(freq, closed="left", label="right")
            if not freq.startswith(("B", "W"))
            else df.resample(freq)
        )
        return bar_group, bar_size, None

    # Dynamic bar sizing
    if bar_type == "tick" and isinstance(bar_size, str):
        bar_size = calculate_ticks_per_period(df, bar_size)

    if not isinstance(bar_size, int):
        raise NotImplementedError(
            f"{bar_type} bars require integer bar_size, but you input '{bar_size}'"
        )
    elif bar_size == 0:
        raise NotImplementedError(f"{bar_type} bars require non-zero bar_size")

    # Non-time bars
    df["time"] = df.index  # Add without copying

    if bar_type == "tick":
        bar_id = np.arange(len(df)) // bar_size
    elif bar_type in ("volume", "dollar"):
        if "volume" not in df.columns:
            raise KeyError(f"'volume' column required for {bar_type} bars")

        # Optimized cumulative sum
        cum_metric = df["volume"] * df["bid"] if bar_type == "dollar" else df["volume"]
        cumsum = cum_metric.cumsum()
        bar_id = (cumsum // bar_size).astype(int)
    else:
        raise NotImplementedError(f"{bar_type} bars not implemented")

    return df.groupby(bar_id), bar_size, bar_id


def make_bars(
    tick_df: pd.DataFrame,
    bar_type: str = "tick",
    bar_size: Union[int, str] = 100,
    price: str = "mid_price",
    tick_num: bool = True,
    verbose: bool = False,
):
    """
    Constructs OHLC bars from tick data.

    Args:
        tick_df (pd.DataFrame): Tick data.
        bar_type (str): Bar type ('tick', 'time', 'volume', 'dollar').
        bar_size (int | str): For non-time bars; if str, dynamic calculation is used.
        timeframe (str): Timeframe for calculation.
        price (str): Price field strategy ('bid', 'ask', 'mid_price', 'bid_ask').
        tick_num (bool): Add column with index of which tick where each bar was formed if True.
        verbose (bool): Prints runtime details if True.

    Returns:
        pd.DataFrame: OHLC bars with additional metrics.
    """
    if tick_df.empty:
        logger.warning("Empty tick_df passed to make_bars. Returning empty DataFrame.")
        return pd.DataFrame()
    tick_df = tick_df.copy()
    tick_df.rename(
        columns={
            "Price": "price",
            "Volume": "volume",
            "Bid": "bid",
            "Ask": "ask",
        },
        inplace=True,
    )

    if {"bid", "ask"}.issubset(tick_df.columns):
        tick_df["mid_price"] = (tick_df["bid"] + tick_df["ask"]) / 2
    elif "price" in tick_df.columns:
        tick_df["mid_price"] = tick_df["price"]
        tick_df["bid"] = tick_df["price"]
        tick_df["ask"] = tick_df["price"]
    elif "mid_price" not in tick_df.columns:
        raise KeyError("Tick data must contain either bid/ask, price, or mid_price columns")

    if "spread" not in tick_df.columns:
        tick_df["spread"] = tick_df["ask"] - tick_df["bid"]
        tick_df["spread_bps"] = tick_df["spread"] / tick_df["mid_price"] * 10000

    price_cols = ["bid", "ask"] if price == "bid_ask" else [price]
    price_cols += ["spread", "spread_bps"]
    if bar_type in ("volume", "dollar") and "volume" not in tick_df:
        raise KeyError(f"'volume' column required for {bar_type} bars")
    if "volume" in tick_df:
        price_cols.append("volume")

    bar_group, bar_size, bar_id = _make_bar_type_grouper(
        tick_df[price_cols], bar_type, bar_size
    )

    if price != "bid_ask":
        ohlc_df = bar_group[price].ohlc()
    else:
        ohlc_df = bar_group.agg({k: "ohlc" for k in ("bid", "ask")})
        ohlc_df = flatten_column_names(ohlc_df)
        # Make OHLC using mid-price
        for col in ["open", "high", "low", "close"]:
            ohlc_df[col] = ohlc_df.filter(regex=col).sum(axis=1).div(2)

    ohlc_df["spread"] = bar_group["spread"].mean()
    ohlc_df["spread_bps"] = bar_group["spread_bps"].mean()
    ohlc_df["tick_volume"] = bar_group.size() if bar_type != "tick" else bar_size

    if "volume" in tick_df.columns:
        ohlc_df["volume"] = bar_group["volume"].sum()

    if bar_type == "time":
        eq_zero = ohlc_df["tick_volume"] == 0
        ohlc_df = ohlc_df[~eq_zero]

        nzeros = eq_zero.sum()
        if nzeros > 0:
            nrows = ohlc_df.shape[0]
            msg = f"{nzeros:,} of {nrows:,} ({nzeros / nrows:.2%}) rows with zero tick volume."
            logger.info(f"Dropped {msg}")

        if tick_num:
            ohlc_df["tick_num"] = ohlc_df["tick_volume"].cumsum()  # 1-based index

    else:
        ohlc_df.index = bar_group["time"].last() + pd.Timedelta(
            microseconds=1
        )  # Ensure end time is after last tick

        if len(tick_df) % bar_size > 0:
            ohlc_df = ohlc_df.iloc[:-1]

        if tick_num:
            ohlc_df["tick_num"] = _get_bar_tick_indices(tick_df, bar_size, bar_id)

    try:
        ohlc_df = ohlc_df.tz_convert(None)  # Remove timezone information from index
    except TypeError:
        logger.warning(
            "The tick data used to construct 'ohlc_df' lacks timezone information; skipping tz conversion. \
                Ensure source data is timezone-aware to avoid downstream ambiguity."
        )

    ohlc_df = optimize_dtypes(ohlc_df)  # Save memory

    if verbose:
        bar_info = (
            f"{bar_type}-{bar_size:,}"
            if (bar_type != "time")
            else f"{bar_size.upper()}"
        )
        logger.info(f"{bar_info} bars contain {ohlc_df.shape[0]:,} rows.")
        logger.info(f"Tick data contains {tick_df.shape[0]:,} rows.")
        log_df_info(ohlc_df)

    return ohlc_df


def _get_bar_tick_indices(tick_df, bar_size, bar_id) -> pd.Series:
    """
    Return the tick indices that form each bar.

    Parameters
    ----------
    tick_df : pd.DataFrame
        Tick data with datetime index (or 'time' column).
    bar_type : str, default 'tick'
        Bar type ('tick', 'time', 'volume', 'dollar').
    bar_size : int or str, default 100
        Bar size. If str and bar_type='tick', dynamic calculation is used.

    Returns
    -------
    pd.Series
        Series indexed by bar end time with tick number on which bar was formed
    """
    n_ticks = len(tick_df)

    # Find where bar_id changes (new bar starts)
    # diff > 0 indicates a bar boundary
    diff = np.diff(bar_id, prepend=-1)
    boundary_indices = np.where(diff > 0)[0]

    # Last tick indices are one before each boundary
    last_indices = boundary_indices - 1

    # Add final bar if complete
    if n_ticks % bar_size == 0 and n_ticks > 0:
        last_indices = np.append(last_indices, n_ticks - 1)

    # Filter valid indices and set to 1-based index
    last_indices = last_indices[last_indices >= 0] + 1

    return last_indices