import numpy as np import pandas as pd from datetime import datetime, time class MarketProfile: def __init__(self, multiplier=2.0): self.multiplier = multiplier self.counts = {} # price -> count (time/tick opportunity) self.total_ticks = 0 self.min_price = float('inf') self.max_price = float('-inf') def reset(self): self.counts = {} self.total_ticks = 0 self.min_price = float('inf') self.max_price = float('-inf') def fill_gaps(self, prices: np.ndarray, timestamps_ns: np.ndarray, step_sizes: np.ndarray): """ Vectorised gap-fill with dynamic step sizes. step_sizes: array of shape (N,) corresponding to each price point. We use step_sizes[:-1] for the gaps starting at prices[:-1]. Returns: (filled_prices, filled_timestamps_ns) """ if len(prices) < 2: return prices, timestamps_ns # Step sizes for the intervals (from point i -> i+1) # If scalar, broadcast. If array, slice. if np.isscalar(step_sizes): # Broadcast to shape (N-1,) steps_interval = np.full(len(prices)-1, step_sizes, dtype=np.float64) else: # Assume step_sizes corresponds to prices. The step for gap i->i+1 is step_sizes[i]. steps_interval = step_sizes[:-1] # Avoid division by zero or extremely small steps steps_interval = np.where(steps_interval < 0.000001, 0.01, steps_interval) diff = np.diff(prices) # Number of units (steps) to fill for each gap diff_units = np.round(diff / steps_interval).astype(np.int64) counts = np.abs(diff_units) # Last point gets a count of 1 (itself) counts = np.append(counts, 1) total = int(np.sum(counts)) if total == 0: return prices, timestamps_ns indices = np.repeat(np.arange(len(prices)), counts) # Offset within each segment (0, 1, 2...) cum = np.cumsum(counts) starts = np.empty_like(cum) starts[0] = 0 starts[1:] = cum[:-1] offsets = np.arange(total) - np.repeat(starts, counts) # Direction per segment (+1 or -1) directions = np.zeros(len(prices), dtype=np.float64) directions[:-1] = np.sign(diff_units) # Time step per segment # We need to interpolate time as well dt = np.zeros(len(prices), dtype=np.float64) dt[:-1] = np.diff(timestamps_ns).astype(np.float64) # Avoid division by zero in time steps if counts is 0 (shouldn't happen with counts > 0 check, but be safe) div_counts = np.where(counts > 0, counts, 1) time_steps = dt / div_counts # Expand step sizes and time steps if np.isscalar(step_sizes): expanded_steps = np.full(len(indices), step_sizes, dtype=np.float64) else: expanded_steps = step_sizes[indices] expanded_time_steps = time_steps[indices] # Calculate filled prices and times filled_prices = prices[indices] + offsets * directions[indices] * expanded_steps filled_ts = timestamps_ns[indices].astype(np.float64) + offsets * expanded_time_steps return np.round(filled_prices, 2), filled_ts.astype(np.int64) def update(self, ticks_df: pd.DataFrame): """ Updates the profile with new ticks. ticks_df must have 'bid', 'ask', 'datetime'. """ if ticks_df.empty: return timestamps_ns = ticks_df['datetime'].values.astype('datetime64[ns]').astype(np.int64) bids = ticks_df['bid'].values.astype(np.float64) # Calculate dynamic step sizes based on Spread # Spread = Ask - Bid # Step = Spread * Multiplier # Ensure 'ask' exists if 'ask' in ticks_df.columns: asks = ticks_df['ask'].values.astype(np.float64) spreads = asks - bids # Ensure non-negative/non-zero spread fallback spreads = np.maximum(spreads, 0.00001) step_sizes = spreads * self.multiplier # Update Bid self.add_data(bids, timestamps_ns, step_sizes) # Update Ask self.add_data(asks, timestamps_ns, step_sizes) else: # Fallback if no ask column step_sizes = np.full(len(bids), 0.01 * self.multiplier) self.add_data(bids, timestamps_ns, step_sizes) def add_data(self, prices: np.ndarray, timestamps_ns: np.ndarray, step_sizes: np.ndarray): """ Gap-fills the data and updates the histogram counts. """ filled_prices, filled_ts = self.fill_gaps(prices, timestamps_ns, step_sizes) # Update histogram unique, counts = np.unique(filled_prices, return_counts=True) for p, c in zip(unique, counts): p = round(float(p), 2) self.counts[p] = self.counts.get(p, 0) + c self.total_ticks += c if p < self.min_price: self.min_price = p if p > self.max_price: self.max_price = p def get_vah_val_poc(self): """ Calculates Value Area High (VAH), Value Area Low (VAL), and Point of Control (POC). Standard definition: 70% of volume around POC. """ if not self.counts: return None, None, None # Convert to sorted list of (price, count) sorted_prices = sorted(self.counts.keys()) counts_list = [self.counts[p] for p in sorted_prices] counts_array = np.array(counts_list, dtype=np.int64) prices_array = np.array(sorted_prices, dtype=np.float64) # POC poc_idx = np.argmax(counts_array) poc_price = prices_array[poc_idx] # Value Area (70%) total_count = np.sum(counts_array) target_count = total_count * 0.70 current_count = counts_array[poc_idx] left_idx = poc_idx right_idx = poc_idx # Greedily expand while current_count < target_count: # Try to pick best side can_go_left = left_idx > 0 can_go_right = right_idx < len(counts_array) - 1 if not can_go_left and not can_go_right: break count_left = counts_array[left_idx - 1] if can_go_left else -1 count_right = counts_array[right_idx + 1] if can_go_right else -1 if count_left > count_right: current_count += count_left left_idx -= 1 elif count_right > count_left: current_count += count_right right_idx += 1 else: # Equal counts, expand both if possible if can_go_left: current_count += count_left left_idx -= 1 if can_go_right: current_count += count_right right_idx += 1 val_price = prices_array[left_idx] vah_price = prices_array[right_idx] return vah_price, val_price, poc_price