| | import time
|
| | from datetime import datetime, timedelta, timezone
|
| | import pandas as pd
|
| | import numpy as np
|
| | from PyQt6.QtCore import QThread, pyqtSignal, QObject
|
| |
|
| | from src.core.mt5_interface import MT5Interface
|
| | from src.core.market_profile import MarketProfile
|
| |
|
| | class DataWorker(QThread):
|
| |
|
| | data_signal = pyqtSignal(object, object)
|
| | levels_signal = pyqtSignal(object, object, object, object)
|
| | status_signal = pyqtSignal(str)
|
| | finished_signal = pyqtSignal()
|
| |
|
| | def __init__(self, symbol, date_obj, multiplier=1.0):
|
| | super().__init__()
|
| | self.symbol = symbol
|
| | self.date_obj = date_obj
|
| | self.multiplier = multiplier
|
| | self.running = True
|
| | self.mt5_interface = MT5Interface()
|
| | self.market_profile = MarketProfile(multiplier=self.multiplier)
|
| |
|
| | def run(self):
|
| | self.status_signal.emit(f"Connecting to MT5... (Multiplier: {self.multiplier}x)")
|
| | if not self.mt5_interface.initialize():
|
| | self.status_signal.emit("Failed to connect to MT5.")
|
| | self.finished_signal.emit()
|
| | return
|
| |
|
| |
|
| |
|
| | target_date_utc = datetime(self.date_obj.year, self.date_obj.month, self.date_obj.day, tzinfo=timezone.utc)
|
| |
|
| |
|
| | start_establishment = target_date_utc - timedelta(days=1) + timedelta(hours=22)
|
| |
|
| | end_establishment = target_date_utc
|
| |
|
| | end_session = target_date_utc + timedelta(days=1)
|
| |
|
| |
|
| | now_utc = datetime.now(timezone.utc)
|
| |
|
| |
|
| | is_historical = end_session < now_utc
|
| | fetch_end = end_session if is_historical else now_utc
|
| |
|
| | self.status_signal.emit(f"Fetch Range: {start_establishment} to {fetch_end} ...")
|
| |
|
| |
|
| | ticks_df = self.mt5_interface.get_ticks(self.symbol, start_establishment, fetch_end)
|
| |
|
| | if not ticks_df.empty:
|
| |
|
| | mask_est = (ticks_df['datetime'] >= start_establishment) & (ticks_df['datetime'] < end_establishment)
|
| | df_est = ticks_df.loc[mask_est]
|
| |
|
| | mask_dev = (ticks_df['datetime'] >= end_establishment)
|
| | df_dev = ticks_df.loc[mask_dev]
|
| |
|
| | self.status_signal.emit(f"Data: {len(ticks_df)} total. Est: {len(df_est)}, Dev: {len(df_dev)}")
|
| |
|
| |
|
| | if not df_est.empty:
|
| | self.market_profile.update(df_est)
|
| | self.status_signal.emit(f"Profile Established. Ticks: {self.market_profile.total_ticks}")
|
| | else:
|
| | self.status_signal.emit("Warning: No Establishment Data (22:00-00:00). Starting empty.")
|
| |
|
| |
|
| | dev_times = []
|
| | dev_vah = []
|
| | dev_val = []
|
| | dev_poc = []
|
| |
|
| | if not df_dev.empty:
|
| |
|
| | df_dev_indexed = df_dev.set_index('datetime')
|
| | grouped = df_dev_indexed.resample('1min')
|
| |
|
| | count_steps = 0
|
| | for time_idx, group in grouped:
|
| | if group.empty:
|
| | continue
|
| |
|
| |
|
| | group_reset = group.reset_index()
|
| | self.market_profile.update(group_reset)
|
| |
|
| |
|
| | v, l, p = self.market_profile.get_vah_val_poc()
|
| | if v is not None:
|
| |
|
| | ts_float = time_idx.timestamp()
|
| | dev_times.append(ts_float)
|
| | dev_vah.append(v)
|
| | dev_val.append(l)
|
| | dev_poc.append(p)
|
| | count_steps += 1
|
| |
|
| | self.status_signal.emit(f"Calculated {count_steps} developing points.")
|
| |
|
| |
|
| |
|
| | print(f"DEBUG: Worker Emitting Ticks: {len(ticks_df)}")
|
| | self.data_signal.emit(ticks_df, self.market_profile.counts)
|
| |
|
| |
|
| | if dev_times:
|
| | print(f"DEBUG: Worker Emitting Levels: {len(dev_times)} pts. Times: {dev_times[0]} -> {dev_times[-1]}")
|
| | self.levels_signal.emit(
|
| | np.array(dev_times),
|
| | np.array(dev_vah),
|
| | np.array(dev_val),
|
| | np.array(dev_poc)
|
| | )
|
| | else:
|
| | print("DEBUG: No developing levels calculated to emit.")
|
| | self.status_signal.emit("No developing levels calculated (insufficient dev data?).")
|
| |
|
| | else:
|
| | self.status_signal.emit("No ticks returned from MT5.")
|
| |
|
| |
|
| | if not is_historical:
|
| | self.status_signal.emit("Live streaming active...")
|
| |
|
| | last_time = now_utc
|
| | if not ticks_df.empty:
|
| | last_time = ticks_df['datetime'].iloc[-1].to_pydatetime()
|
| |
|
| | while self.running:
|
| | time.sleep(1.0)
|
| |
|
| | cur_time = datetime.now(timezone.utc)
|
| | new_ticks = self.mt5_interface.get_ticks(self.symbol, last_time, cur_time + timedelta(seconds=1))
|
| |
|
| | if not new_ticks.empty:
|
| | new_ticks = new_ticks[new_ticks['datetime'] > last_time]
|
| |
|
| | if not new_ticks.empty:
|
| | self.market_profile.update(new_ticks)
|
| | last_time = new_ticks['datetime'].iloc[-1].to_pydatetime()
|
| |
|
| |
|
| |
|
| | self.data_signal.emit(new_ticks, self.market_profile.counts)
|
| |
|
| |
|
| | v, l, p = self.market_profile.get_vah_val_poc()
|
| | if v is not None:
|
| | ts_now = cur_time.timestamp()
|
| |
|
| | self.levels_signal.emit([ts_now], [v], [l], [p])
|
| | else:
|
| | self.status_signal.emit("Historical view loaded. Live stream inactive.")
|
| |
|
| |
|
| | def stop(self):
|
| | self.running = False
|
| | self.mt5_interface.shutdown()
|
| | self.wait()
|
| |
|