File size: 7,231 Bytes
c99df4c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | import time
from datetime import datetime, timedelta, timezone
import pandas as pd
import numpy as np
from PyQt6.QtCore import QThread, pyqtSignal, QObject
from src.core.mt5_interface import MT5Interface
from src.core.market_profile import MarketProfile
class DataWorker(QThread):
# Signals
data_signal = pyqtSignal(object, object) # ticks_df, profile_counts
levels_signal = pyqtSignal(object, object, object, object) # times, vah, val, poc (can be arrays or scalars)
status_signal = pyqtSignal(str)
finished_signal = pyqtSignal()
def __init__(self, symbol, date_obj, multiplier=1.0):
super().__init__()
self.symbol = symbol
self.date_obj = date_obj
self.multiplier = multiplier
self.running = True
self.mt5_interface = MT5Interface()
self.market_profile = MarketProfile(multiplier=self.multiplier)
def run(self):
self.status_signal.emit(f"Connecting to MT5... (Multiplier: {self.multiplier}x)")
if not self.mt5_interface.initialize():
self.status_signal.emit("Failed to connect to MT5.")
self.finished_signal.emit()
return
# Calculate session times
# Target Date (00:00 UTC of the selected day)
target_date_utc = datetime(self.date_obj.year, self.date_obj.month, self.date_obj.day, tzinfo=timezone.utc)
# Establishment Start: 22:00 UTC previous day
start_establishment = target_date_utc - timedelta(days=1) + timedelta(hours=22)
# Establishment End (Developing Start): 00:00 UTC target day
end_establishment = target_date_utc
# Session End: 00:00 UTC next day (24h later)
end_session = target_date_utc + timedelta(days=1)
# Current time
now_utc = datetime.now(timezone.utc)
# Determine Fetch Range
is_historical = end_session < now_utc
fetch_end = end_session if is_historical else now_utc
self.status_signal.emit(f"Fetch Range: {start_establishment} to {fetch_end} ...")
# 1. Fetch History
ticks_df = self.mt5_interface.get_ticks(self.symbol, start_establishment, fetch_end)
if not ticks_df.empty:
# Split Data
mask_est = (ticks_df['datetime'] >= start_establishment) & (ticks_df['datetime'] < end_establishment)
df_est = ticks_df.loc[mask_est]
mask_dev = (ticks_df['datetime'] >= end_establishment)
df_dev = ticks_df.loc[mask_dev]
self.status_signal.emit(f"Data: {len(ticks_df)} total. Est: {len(df_est)}, Dev: {len(df_dev)}")
# 2. Process Establishment Phase
if not df_est.empty:
self.market_profile.update(df_est)
self.status_signal.emit(f"Profile Established. Ticks: {self.market_profile.total_ticks}")
else:
self.status_signal.emit("Warning: No Establishment Data (22:00-00:00). Starting empty.")
# 3. Process Developing Phase (History Replay)
dev_times = []
dev_vah = []
dev_val = []
dev_poc = []
if not df_dev.empty:
# Resample to 1 minute to calculate trajectory
df_dev_indexed = df_dev.set_index('datetime')
grouped = df_dev_indexed.resample('1min')
count_steps = 0
for time_idx, group in grouped:
if group.empty:
continue
# Update profile
group_reset = group.reset_index()
self.market_profile.update(group_reset)
# Calculate levels
v, l, p = self.market_profile.get_vah_val_poc()
if v is not None:
# Use timestamp
ts_float = time_idx.timestamp()
dev_times.append(ts_float)
dev_vah.append(v)
dev_val.append(l)
dev_poc.append(p)
count_steps += 1
self.status_signal.emit(f"Calculated {count_steps} developing points.")
# Emit History Data: Ticks
# If extremely large, maybe downsample? But for now send all.
print(f"DEBUG: Worker Emitting Ticks: {len(ticks_df)}")
self.data_signal.emit(ticks_df, self.market_profile.counts)
# Emit Levels
if dev_times:
print(f"DEBUG: Worker Emitting Levels: {len(dev_times)} pts. Times: {dev_times[0]} -> {dev_times[-1]}")
self.levels_signal.emit(
np.array(dev_times),
np.array(dev_vah),
np.array(dev_val),
np.array(dev_poc)
)
else:
print("DEBUG: No developing levels calculated to emit.")
self.status_signal.emit("No developing levels calculated (insufficient dev data?).")
else:
self.status_signal.emit("No ticks returned from MT5.")
# 4. Live Streaming (Only if not historical)
if not is_historical:
self.status_signal.emit("Live streaming active...")
last_time = now_utc
if not ticks_df.empty:
last_time = ticks_df['datetime'].iloc[-1].to_pydatetime()
while self.running:
time.sleep(1.0)
cur_time = datetime.now(timezone.utc)
new_ticks = self.mt5_interface.get_ticks(self.symbol, last_time, cur_time + timedelta(seconds=1))
if not new_ticks.empty:
new_ticks = new_ticks[new_ticks['datetime'] > last_time]
if not new_ticks.empty:
self.market_profile.update(new_ticks)
last_time = new_ticks['datetime'].iloc[-1].to_pydatetime()
# Emit Tick Data
# print(f"DEBUG: Live Tick Update: {len(new_ticks)}")
self.data_signal.emit(new_ticks, self.market_profile.counts)
# Emit Level Update
v, l, p = self.market_profile.get_vah_val_poc()
if v is not None:
ts_now = cur_time.timestamp()
# print(f"DEBUG: Live Level Update: POC {p}")
self.levels_signal.emit([ts_now], [v], [l], [p])
else:
self.status_signal.emit("Historical view loaded. Live stream inactive.")
def stop(self):
self.running = False
self.mt5_interface.shutdown()
self.wait()
|