mBA-Terminal / src /core /data_worker.py
algorembrant's picture
Upload 29 files
c99df4c verified
import time
from datetime import datetime, timedelta, timezone
import pandas as pd
import numpy as np
from PyQt6.QtCore import QThread, pyqtSignal, QObject
from src.core.mt5_interface import MT5Interface
from src.core.market_profile import MarketProfile
class DataWorker(QThread):
# Signals
data_signal = pyqtSignal(object, object) # ticks_df, profile_counts
levels_signal = pyqtSignal(object, object, object, object) # times, vah, val, poc (can be arrays or scalars)
status_signal = pyqtSignal(str)
finished_signal = pyqtSignal()
def __init__(self, symbol, date_obj, multiplier=1.0):
super().__init__()
self.symbol = symbol
self.date_obj = date_obj
self.multiplier = multiplier
self.running = True
self.mt5_interface = MT5Interface()
self.market_profile = MarketProfile(multiplier=self.multiplier)
def run(self):
self.status_signal.emit(f"Connecting to MT5... (Multiplier: {self.multiplier}x)")
if not self.mt5_interface.initialize():
self.status_signal.emit("Failed to connect to MT5.")
self.finished_signal.emit()
return
# Calculate session times
# Target Date (00:00 UTC of the selected day)
target_date_utc = datetime(self.date_obj.year, self.date_obj.month, self.date_obj.day, tzinfo=timezone.utc)
# Establishment Start: 22:00 UTC previous day
start_establishment = target_date_utc - timedelta(days=1) + timedelta(hours=22)
# Establishment End (Developing Start): 00:00 UTC target day
end_establishment = target_date_utc
# Session End: 00:00 UTC next day (24h later)
end_session = target_date_utc + timedelta(days=1)
# Current time
now_utc = datetime.now(timezone.utc)
# Determine Fetch Range
is_historical = end_session < now_utc
fetch_end = end_session if is_historical else now_utc
self.status_signal.emit(f"Fetch Range: {start_establishment} to {fetch_end} ...")
# 1. Fetch History
ticks_df = self.mt5_interface.get_ticks(self.symbol, start_establishment, fetch_end)
if not ticks_df.empty:
# Split Data
mask_est = (ticks_df['datetime'] >= start_establishment) & (ticks_df['datetime'] < end_establishment)
df_est = ticks_df.loc[mask_est]
mask_dev = (ticks_df['datetime'] >= end_establishment)
df_dev = ticks_df.loc[mask_dev]
self.status_signal.emit(f"Data: {len(ticks_df)} total. Est: {len(df_est)}, Dev: {len(df_dev)}")
# 2. Process Establishment Phase
if not df_est.empty:
self.market_profile.update(df_est)
self.status_signal.emit(f"Profile Established. Ticks: {self.market_profile.total_ticks}")
else:
self.status_signal.emit("Warning: No Establishment Data (22:00-00:00). Starting empty.")
# 3. Process Developing Phase (History Replay)
dev_times = []
dev_vah = []
dev_val = []
dev_poc = []
if not df_dev.empty:
# Resample to 1 minute to calculate trajectory
df_dev_indexed = df_dev.set_index('datetime')
grouped = df_dev_indexed.resample('1min')
count_steps = 0
for time_idx, group in grouped:
if group.empty:
continue
# Update profile
group_reset = group.reset_index()
self.market_profile.update(group_reset)
# Calculate levels
v, l, p = self.market_profile.get_vah_val_poc()
if v is not None:
# Use timestamp
ts_float = time_idx.timestamp()
dev_times.append(ts_float)
dev_vah.append(v)
dev_val.append(l)
dev_poc.append(p)
count_steps += 1
self.status_signal.emit(f"Calculated {count_steps} developing points.")
# Emit History Data: Ticks
# If extremely large, maybe downsample? But for now send all.
print(f"DEBUG: Worker Emitting Ticks: {len(ticks_df)}")
self.data_signal.emit(ticks_df, self.market_profile.counts)
# Emit Levels
if dev_times:
print(f"DEBUG: Worker Emitting Levels: {len(dev_times)} pts. Times: {dev_times[0]} -> {dev_times[-1]}")
self.levels_signal.emit(
np.array(dev_times),
np.array(dev_vah),
np.array(dev_val),
np.array(dev_poc)
)
else:
print("DEBUG: No developing levels calculated to emit.")
self.status_signal.emit("No developing levels calculated (insufficient dev data?).")
else:
self.status_signal.emit("No ticks returned from MT5.")
# 4. Live Streaming (Only if not historical)
if not is_historical:
self.status_signal.emit("Live streaming active...")
last_time = now_utc
if not ticks_df.empty:
last_time = ticks_df['datetime'].iloc[-1].to_pydatetime()
while self.running:
time.sleep(1.0)
cur_time = datetime.now(timezone.utc)
new_ticks = self.mt5_interface.get_ticks(self.symbol, last_time, cur_time + timedelta(seconds=1))
if not new_ticks.empty:
new_ticks = new_ticks[new_ticks['datetime'] > last_time]
if not new_ticks.empty:
self.market_profile.update(new_ticks)
last_time = new_ticks['datetime'].iloc[-1].to_pydatetime()
# Emit Tick Data
# print(f"DEBUG: Live Tick Update: {len(new_ticks)}")
self.data_signal.emit(new_ticks, self.market_profile.counts)
# Emit Level Update
v, l, p = self.market_profile.get_vah_val_poc()
if v is not None:
ts_now = cur_time.timestamp()
# print(f"DEBUG: Live Level Update: POC {p}")
self.levels_signal.emit([ts_now], [v], [l], [p])
else:
self.status_signal.emit("Historical view loaded. Live stream inactive.")
def stop(self):
self.running = False
self.mt5_interface.shutdown()
self.wait()