| | import numpy as np
|
| | import pandas as pd
|
| | from datetime import datetime, time
|
| |
|
| | class MarketProfile:
|
| | def __init__(self, multiplier=2.0):
|
| | self.multiplier = multiplier
|
| | self.counts = {}
|
| | self.total_ticks = 0
|
| | self.min_price = float('inf')
|
| | self.max_price = float('-inf')
|
| |
|
| | def reset(self):
|
| | self.counts = {}
|
| | self.total_ticks = 0
|
| | self.min_price = float('inf')
|
| | self.max_price = float('-inf')
|
| |
|
| | def fill_gaps(self, prices: np.ndarray, timestamps_ns: np.ndarray, step_sizes: np.ndarray):
|
| | """
|
| | Vectorised gap-fill with dynamic step sizes.
|
| | step_sizes: array of shape (N,) corresponding to each price point.
|
| | We use step_sizes[:-1] for the gaps starting at prices[:-1].
|
| | Returns: (filled_prices, filled_timestamps_ns)
|
| | """
|
| | if len(prices) < 2:
|
| | return prices, timestamps_ns
|
| |
|
| |
|
| |
|
| | if np.isscalar(step_sizes):
|
| |
|
| | steps_interval = np.full(len(prices)-1, step_sizes, dtype=np.float64)
|
| | else:
|
| |
|
| | steps_interval = step_sizes[:-1]
|
| |
|
| |
|
| | steps_interval = np.where(steps_interval < 0.000001, 0.01, steps_interval)
|
| |
|
| | diff = np.diff(prices)
|
| |
|
| | diff_units = np.round(diff / steps_interval).astype(np.int64)
|
| | counts = np.abs(diff_units)
|
| |
|
| |
|
| | counts = np.append(counts, 1)
|
| |
|
| | total = int(np.sum(counts))
|
| | if total == 0:
|
| | return prices, timestamps_ns
|
| |
|
| | indices = np.repeat(np.arange(len(prices)), counts)
|
| |
|
| |
|
| | cum = np.cumsum(counts)
|
| | starts = np.empty_like(cum)
|
| | starts[0] = 0
|
| | starts[1:] = cum[:-1]
|
| | offsets = np.arange(total) - np.repeat(starts, counts)
|
| |
|
| |
|
| | directions = np.zeros(len(prices), dtype=np.float64)
|
| | directions[:-1] = np.sign(diff_units)
|
| |
|
| |
|
| |
|
| | dt = np.zeros(len(prices), dtype=np.float64)
|
| | dt[:-1] = np.diff(timestamps_ns).astype(np.float64)
|
| |
|
| |
|
| | div_counts = np.where(counts > 0, counts, 1)
|
| | time_steps = dt / div_counts
|
| |
|
| |
|
| | if np.isscalar(step_sizes):
|
| | expanded_steps = np.full(len(indices), step_sizes, dtype=np.float64)
|
| | else:
|
| | expanded_steps = step_sizes[indices]
|
| |
|
| | expanded_time_steps = time_steps[indices]
|
| |
|
| |
|
| | filled_prices = prices[indices] + offsets * directions[indices] * expanded_steps
|
| | filled_ts = timestamps_ns[indices].astype(np.float64) + offsets * expanded_time_steps
|
| |
|
| | return np.round(filled_prices, 2), filled_ts.astype(np.int64)
|
| |
|
| | def update(self, ticks_df: pd.DataFrame):
|
| | """
|
| | Updates the profile with new ticks.
|
| | ticks_df must have 'bid', 'ask', 'datetime'.
|
| | """
|
| | if ticks_df.empty:
|
| | return
|
| |
|
| | timestamps_ns = ticks_df['datetime'].values.astype('datetime64[ns]').astype(np.int64)
|
| | bids = ticks_df['bid'].values.astype(np.float64)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | if 'ask' in ticks_df.columns:
|
| | asks = ticks_df['ask'].values.astype(np.float64)
|
| | spreads = asks - bids
|
| |
|
| | spreads = np.maximum(spreads, 0.00001)
|
| | step_sizes = spreads * self.multiplier
|
| |
|
| |
|
| | self.add_data(bids, timestamps_ns, step_sizes)
|
| |
|
| | self.add_data(asks, timestamps_ns, step_sizes)
|
| |
|
| | else:
|
| |
|
| | step_sizes = np.full(len(bids), 0.01 * self.multiplier)
|
| | self.add_data(bids, timestamps_ns, step_sizes)
|
| |
|
| | def add_data(self, prices: np.ndarray, timestamps_ns: np.ndarray, step_sizes: np.ndarray):
|
| | """
|
| | Gap-fills the data and updates the histogram counts.
|
| | """
|
| | filled_prices, filled_ts = self.fill_gaps(prices, timestamps_ns, step_sizes)
|
| |
|
| |
|
| | unique, counts = np.unique(filled_prices, return_counts=True)
|
| |
|
| | for p, c in zip(unique, counts):
|
| | p = round(float(p), 2)
|
| | self.counts[p] = self.counts.get(p, 0) + c
|
| | self.total_ticks += c
|
| | if p < self.min_price: self.min_price = p
|
| | if p > self.max_price: self.max_price = p
|
| |
|
| | def get_vah_val_poc(self):
|
| | """
|
| | Calculates Value Area High (VAH), Value Area Low (VAL), and Point of Control (POC).
|
| | Standard definition: 70% of volume around POC.
|
| | """
|
| | if not self.counts:
|
| | return None, None, None
|
| |
|
| |
|
| | sorted_prices = sorted(self.counts.keys())
|
| | counts_list = [self.counts[p] for p in sorted_prices]
|
| |
|
| | counts_array = np.array(counts_list, dtype=np.int64)
|
| | prices_array = np.array(sorted_prices, dtype=np.float64)
|
| |
|
| |
|
| | poc_idx = np.argmax(counts_array)
|
| | poc_price = prices_array[poc_idx]
|
| |
|
| |
|
| | total_count = np.sum(counts_array)
|
| | target_count = total_count * 0.70
|
| |
|
| | current_count = counts_array[poc_idx]
|
| | left_idx = poc_idx
|
| | right_idx = poc_idx
|
| |
|
| |
|
| | while current_count < target_count:
|
| |
|
| | can_go_left = left_idx > 0
|
| | can_go_right = right_idx < len(counts_array) - 1
|
| |
|
| | if not can_go_left and not can_go_right:
|
| | break
|
| |
|
| | count_left = counts_array[left_idx - 1] if can_go_left else -1
|
| | count_right = counts_array[right_idx + 1] if can_go_right else -1
|
| |
|
| | if count_left > count_right:
|
| | current_count += count_left
|
| | left_idx -= 1
|
| | elif count_right > count_left:
|
| | current_count += count_right
|
| | right_idx += 1
|
| | else:
|
| |
|
| | if can_go_left:
|
| | current_count += count_left
|
| | left_idx -= 1
|
| | if can_go_right:
|
| | current_count += count_right
|
| | right_idx += 1
|
| |
|
| | val_price = prices_array[left_idx]
|
| | vah_price = prices_array[right_idx]
|
| |
|
| | return vah_price, val_price, poc_price
|
| |
|