mBA-Terminal / src /core /market_profile.py
algorembrant's picture
Upload 29 files
c99df4c verified
import numpy as np
import pandas as pd
from datetime import datetime, time
class MarketProfile:
def __init__(self, multiplier=2.0):
self.multiplier = multiplier
self.counts = {} # price -> count (time/tick opportunity)
self.total_ticks = 0
self.min_price = float('inf')
self.max_price = float('-inf')
def reset(self):
self.counts = {}
self.total_ticks = 0
self.min_price = float('inf')
self.max_price = float('-inf')
def fill_gaps(self, prices: np.ndarray, timestamps_ns: np.ndarray, step_sizes: np.ndarray):
"""
Vectorised gap-fill with dynamic step sizes.
step_sizes: array of shape (N,) corresponding to each price point.
We use step_sizes[:-1] for the gaps starting at prices[:-1].
Returns: (filled_prices, filled_timestamps_ns)
"""
if len(prices) < 2:
return prices, timestamps_ns
# Step sizes for the intervals (from point i -> i+1)
# If scalar, broadcast. If array, slice.
if np.isscalar(step_sizes):
# Broadcast to shape (N-1,)
steps_interval = np.full(len(prices)-1, step_sizes, dtype=np.float64)
else:
# Assume step_sizes corresponds to prices. The step for gap i->i+1 is step_sizes[i].
steps_interval = step_sizes[:-1]
# Avoid division by zero or extremely small steps
steps_interval = np.where(steps_interval < 0.000001, 0.01, steps_interval)
diff = np.diff(prices)
# Number of units (steps) to fill for each gap
diff_units = np.round(diff / steps_interval).astype(np.int64)
counts = np.abs(diff_units)
# Last point gets a count of 1 (itself)
counts = np.append(counts, 1)
total = int(np.sum(counts))
if total == 0:
return prices, timestamps_ns
indices = np.repeat(np.arange(len(prices)), counts)
# Offset within each segment (0, 1, 2...)
cum = np.cumsum(counts)
starts = np.empty_like(cum)
starts[0] = 0
starts[1:] = cum[:-1]
offsets = np.arange(total) - np.repeat(starts, counts)
# Direction per segment (+1 or -1)
directions = np.zeros(len(prices), dtype=np.float64)
directions[:-1] = np.sign(diff_units)
# Time step per segment
# We need to interpolate time as well
dt = np.zeros(len(prices), dtype=np.float64)
dt[:-1] = np.diff(timestamps_ns).astype(np.float64)
# Avoid division by zero in time steps if counts is 0 (shouldn't happen with counts > 0 check, but be safe)
div_counts = np.where(counts > 0, counts, 1)
time_steps = dt / div_counts
# Expand step sizes and time steps
if np.isscalar(step_sizes):
expanded_steps = np.full(len(indices), step_sizes, dtype=np.float64)
else:
expanded_steps = step_sizes[indices]
expanded_time_steps = time_steps[indices]
# Calculate filled prices and times
filled_prices = prices[indices] + offsets * directions[indices] * expanded_steps
filled_ts = timestamps_ns[indices].astype(np.float64) + offsets * expanded_time_steps
return np.round(filled_prices, 2), filled_ts.astype(np.int64)
def update(self, ticks_df: pd.DataFrame):
"""
Updates the profile with new ticks.
ticks_df must have 'bid', 'ask', 'datetime'.
"""
if ticks_df.empty:
return
timestamps_ns = ticks_df['datetime'].values.astype('datetime64[ns]').astype(np.int64)
bids = ticks_df['bid'].values.astype(np.float64)
# Calculate dynamic step sizes based on Spread
# Spread = Ask - Bid
# Step = Spread * Multiplier
# Ensure 'ask' exists
if 'ask' in ticks_df.columns:
asks = ticks_df['ask'].values.astype(np.float64)
spreads = asks - bids
# Ensure non-negative/non-zero spread fallback
spreads = np.maximum(spreads, 0.00001)
step_sizes = spreads * self.multiplier
# Update Bid
self.add_data(bids, timestamps_ns, step_sizes)
# Update Ask
self.add_data(asks, timestamps_ns, step_sizes)
else:
# Fallback if no ask column
step_sizes = np.full(len(bids), 0.01 * self.multiplier)
self.add_data(bids, timestamps_ns, step_sizes)
def add_data(self, prices: np.ndarray, timestamps_ns: np.ndarray, step_sizes: np.ndarray):
"""
Gap-fills the data and updates the histogram counts.
"""
filled_prices, filled_ts = self.fill_gaps(prices, timestamps_ns, step_sizes)
# Update histogram
unique, counts = np.unique(filled_prices, return_counts=True)
for p, c in zip(unique, counts):
p = round(float(p), 2)
self.counts[p] = self.counts.get(p, 0) + c
self.total_ticks += c
if p < self.min_price: self.min_price = p
if p > self.max_price: self.max_price = p
def get_vah_val_poc(self):
"""
Calculates Value Area High (VAH), Value Area Low (VAL), and Point of Control (POC).
Standard definition: 70% of volume around POC.
"""
if not self.counts:
return None, None, None
# Convert to sorted list of (price, count)
sorted_prices = sorted(self.counts.keys())
counts_list = [self.counts[p] for p in sorted_prices]
counts_array = np.array(counts_list, dtype=np.int64)
prices_array = np.array(sorted_prices, dtype=np.float64)
# POC
poc_idx = np.argmax(counts_array)
poc_price = prices_array[poc_idx]
# Value Area (70%)
total_count = np.sum(counts_array)
target_count = total_count * 0.70
current_count = counts_array[poc_idx]
left_idx = poc_idx
right_idx = poc_idx
# Greedily expand
while current_count < target_count:
# Try to pick best side
can_go_left = left_idx > 0
can_go_right = right_idx < len(counts_array) - 1
if not can_go_left and not can_go_right:
break
count_left = counts_array[left_idx - 1] if can_go_left else -1
count_right = counts_array[right_idx + 1] if can_go_right else -1
if count_left > count_right:
current_count += count_left
left_idx -= 1
elif count_right > count_left:
current_count += count_right
right_idx += 1
else:
# Equal counts, expand both if possible
if can_go_left:
current_count += count_left
left_idx -= 1
if can_go_right:
current_count += count_right
right_idx += 1
val_price = prices_array[left_idx]
vah_price = prices_array[right_idx]
return vah_price, val_price, poc_price