import time from datetime import datetime, timedelta, timezone import pandas as pd import numpy as np from PyQt6.QtCore import QThread, pyqtSignal, QObject from src.core.mt5_interface import MT5Interface from src.core.market_profile import MarketProfile class DataWorker(QThread): # Signals data_signal = pyqtSignal(object, object) # ticks_df, profile_counts levels_signal = pyqtSignal(object, object, object, object) # times, vah, val, poc (can be arrays or scalars) status_signal = pyqtSignal(str) finished_signal = pyqtSignal() def __init__(self, symbol, date_obj, multiplier=1.0): super().__init__() self.symbol = symbol self.date_obj = date_obj self.multiplier = multiplier self.running = True self.mt5_interface = MT5Interface() self.market_profile = MarketProfile(multiplier=self.multiplier) def run(self): self.status_signal.emit(f"Connecting to MT5... (Multiplier: {self.multiplier}x)") if not self.mt5_interface.initialize(): self.status_signal.emit("Failed to connect to MT5.") self.finished_signal.emit() return # Calculate session times # Target Date (00:00 UTC of the selected day) target_date_utc = datetime(self.date_obj.year, self.date_obj.month, self.date_obj.day, tzinfo=timezone.utc) # Establishment Start: 22:00 UTC previous day start_establishment = target_date_utc - timedelta(days=1) + timedelta(hours=22) # Establishment End (Developing Start): 00:00 UTC target day end_establishment = target_date_utc # Session End: 00:00 UTC next day (24h later) end_session = target_date_utc + timedelta(days=1) # Current time now_utc = datetime.now(timezone.utc) # Determine Fetch Range is_historical = end_session < now_utc fetch_end = end_session if is_historical else now_utc self.status_signal.emit(f"Fetch Range: {start_establishment} to {fetch_end} ...") # 1. Fetch History ticks_df = self.mt5_interface.get_ticks(self.symbol, start_establishment, fetch_end) if not ticks_df.empty: # Split Data mask_est = (ticks_df['datetime'] >= start_establishment) & (ticks_df['datetime'] < end_establishment) df_est = ticks_df.loc[mask_est] mask_dev = (ticks_df['datetime'] >= end_establishment) df_dev = ticks_df.loc[mask_dev] self.status_signal.emit(f"Data: {len(ticks_df)} total. Est: {len(df_est)}, Dev: {len(df_dev)}") # 2. Process Establishment Phase if not df_est.empty: self.market_profile.update(df_est) self.status_signal.emit(f"Profile Established. Ticks: {self.market_profile.total_ticks}") else: self.status_signal.emit("Warning: No Establishment Data (22:00-00:00). Starting empty.") # 3. Process Developing Phase (History Replay) dev_times = [] dev_vah = [] dev_val = [] dev_poc = [] if not df_dev.empty: # Resample to 1 minute to calculate trajectory df_dev_indexed = df_dev.set_index('datetime') grouped = df_dev_indexed.resample('1min') count_steps = 0 for time_idx, group in grouped: if group.empty: continue # Update profile group_reset = group.reset_index() self.market_profile.update(group_reset) # Calculate levels v, l, p = self.market_profile.get_vah_val_poc() if v is not None: # Use timestamp ts_float = time_idx.timestamp() dev_times.append(ts_float) dev_vah.append(v) dev_val.append(l) dev_poc.append(p) count_steps += 1 self.status_signal.emit(f"Calculated {count_steps} developing points.") # Emit History Data: Ticks # If extremely large, maybe downsample? But for now send all. print(f"DEBUG: Worker Emitting Ticks: {len(ticks_df)}") self.data_signal.emit(ticks_df, self.market_profile.counts) # Emit Levels if dev_times: print(f"DEBUG: Worker Emitting Levels: {len(dev_times)} pts. Times: {dev_times[0]} -> {dev_times[-1]}") self.levels_signal.emit( np.array(dev_times), np.array(dev_vah), np.array(dev_val), np.array(dev_poc) ) else: print("DEBUG: No developing levels calculated to emit.") self.status_signal.emit("No developing levels calculated (insufficient dev data?).") else: self.status_signal.emit("No ticks returned from MT5.") # 4. Live Streaming (Only if not historical) if not is_historical: self.status_signal.emit("Live streaming active...") last_time = now_utc if not ticks_df.empty: last_time = ticks_df['datetime'].iloc[-1].to_pydatetime() while self.running: time.sleep(1.0) cur_time = datetime.now(timezone.utc) new_ticks = self.mt5_interface.get_ticks(self.symbol, last_time, cur_time + timedelta(seconds=1)) if not new_ticks.empty: new_ticks = new_ticks[new_ticks['datetime'] > last_time] if not new_ticks.empty: self.market_profile.update(new_ticks) last_time = new_ticks['datetime'].iloc[-1].to_pydatetime() # Emit Tick Data # print(f"DEBUG: Live Tick Update: {len(new_ticks)}") self.data_signal.emit(new_ticks, self.market_profile.counts) # Emit Level Update v, l, p = self.market_profile.get_vah_val_poc() if v is not None: ts_now = cur_time.timestamp() # print(f"DEBUG: Live Level Update: POC {p}") self.levels_signal.emit([ts_now], [v], [l], [p]) else: self.status_signal.emit("Historical view loaded. Live stream inactive.") def stop(self): self.running = False self.mt5_interface.shutdown() self.wait()