Spaces:
Sleeping
Sleeping
File size: 8,249 Bytes
96e0cc2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
import asyncio
import logging
import time
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
import yaml
from core.exchange import BybitExchange
from core.strategy import ScalpingStrategy
from core.risk import RiskManager
from core.data_engine import DataEngine
from services.logger import log, log_trade, log_error, log_performance
logger = logging.getLogger(__name__)
class TradeMonitor:
def __init__(self, exchange: BybitExchange, strategy: ScalpingStrategy,
risk_manager: RiskManager, data_engine: DataEngine):
self.exchange = exchange
self.strategy = strategy
self.risk_manager = risk_manager
self.data_engine = data_engine
self.settings = yaml.safe_load(open("config/settings.yaml"))
self.pairs = yaml.safe_load(open("config/pairs.yaml"))["pairs"]
self.is_running = False
self.monitoring_interval = 2
self.last_signal_check = {}
self.signal_cooldown = 30
self.performance_check_interval = 300
self.last_performance_check = datetime.now()
async def start_monitoring(self):
self.is_running = True
log("π Starting trade monitoring loop")
try:
while self.is_running:
await self._monitoring_cycle()
await asyncio.sleep(self.monitoring_interval)
except Exception as e:
log_error("monitoring_loop", f"Monitoring loop crashed: {e}")
self.is_running = False
def stop_monitoring(self):
self.is_running = False
log("π Trade monitoring stopped")
async def _monitoring_cycle(self):
try:
self.risk_manager.reset_daily_stats()
await self._check_open_positions()
await self._check_signals()
await self._periodic_performance_check()
self._health_check()
except Exception as e:
log_error("monitoring_cycle", f"Error in monitoring cycle: {e}")
async def _check_open_positions(self):
try:
positions = self.exchange.get_positions()
for position in positions:
symbol = position.get("symbol")
if not symbol:
continue
size = float(position.get("size", 0))
if abs(size) < 0.001:
continue
exit_reason = self.risk_manager.check_tp_sl_hit(symbol)
if exit_reason:
log(f"π― {exit_reason} hit for {symbol}")
close_result = self.exchange.close_position(symbol)
if close_result:
self.risk_manager.close_position(symbol, exit_reason)
exit_price = float(close_result.get("avgPrice", 0))
log_trade(symbol, "CLOSE", abs(size), exit_price,
reason=exit_reason)
self.last_signal_check[symbol] = datetime.now() - timedelta(seconds=self.signal_cooldown)
else:
log_error("position_close", f"Failed to close position for {symbol}")
except Exception as e:
log_error("position_check", f"Error checking positions: {e}")
async def _check_signals(self):
try:
current_time = datetime.now()
for symbol in self.pairs:
last_check = self.last_signal_check.get(symbol)
if last_check and (current_time - last_check).seconds < self.signal_cooldown:
continue
signal, confidence, price = self.strategy.generate_signal(symbol)
if signal in ["BUY", "SELL"]:
if self.risk_manager.validate_entry_signal(symbol, signal, confidence):
await self._execute_signal(symbol, signal, confidence, price)
self.last_signal_check[symbol] = current_time
except Exception as e:
log_error("signal_check", f"Error checking signals: {e}")
async def _execute_signal(self, symbol: str, signal: str, confidence: float, price: float):
try:
log(f"π Executing {signal} signal for {symbol} at {price} (conf: {confidence:.2f})")
qty = self.risk_manager.calculate_position_size(symbol, price, signal)
if qty <= 0:
log(f"β Invalid position size for {symbol}: {qty}")
return
self.exchange.set_leverage(symbol)
order = self.exchange.market_order(symbol, signal, qty)
if order:
entry_price = float(order.get("avgPrice", price))
tp_percent = self.settings["trading"]["tp_percent"]
sl_percent = self.settings["trading"]["sl_percent"]
if signal == "BUY":
tp_price = entry_price * (1 + tp_percent)
sl_price = entry_price * (1 - sl_percent)
else:
tp_price = entry_price * (1 - tp_percent)
sl_price = entry_price * (1 + sl_percent)
tp_sl_result = self.exchange.set_tp_sl(symbol, signal, entry_price, tp_price, sl_price)
if tp_sl_result:
log(f"β
TP/SL set for {symbol}: TP={tp_price:.4f}, SL={sl_price:.4f}")
self.risk_manager.update_position(symbol, order)
log_trade(symbol, signal, qty, entry_price, "MARKET")
else:
log_error("tp_sl_setup", f"Failed to set TP/SL for {symbol}")
self.exchange.close_position(symbol)
else:
log_error("order_execution", f"Failed to execute order for {symbol}")
except Exception as e:
log_error("signal_execution", f"Error executing signal for {symbol}: {e}")
async def _periodic_performance_check(self):
try:
current_time = datetime.now()
if (current_time - self.last_performance_check).seconds >= self.performance_check_interval:
risk_status = self.risk_manager.get_risk_status()
from services.logger import logger_instance
trade_stats = logger_instance.get_trade_statistics()
performance_data = {
**risk_status,
**trade_stats
}
log_performance("ALL", "5min", performance_data)
self.last_performance_check = current_time
if risk_status.get('daily_pnl', 0) < -self.risk_manager.max_daily_loss:
log("π¨ Daily loss limit reached - activating emergency stop")
self.risk_manager.emergency_stop_all()
except Exception as e:
log_error("performance_check", f"Error in performance check: {e}")
def _health_check(self):
try:
server_time = self.exchange.get_server_time()
if not server_time:
log_error("health_check", "Exchange connection unhealthy")
buffer_status = self.data_engine.get_buffer_status()
for symbol, buffers in buffer_status.get('price_buffers', {}).items():
if buffers < 10:
log(f"β οΈ Low price buffer for {symbol}: {buffers} points")
except Exception as e:
log_error("health_check", f"Health check error: {e}")
def get_monitoring_status(self) -> Dict[str, Any]:
try:
return {
'is_running': self.is_running,
'monitoring_interval': self.monitoring_interval,
'active_symbols': len(self.pairs),
'open_positions': len(self.risk_manager.open_positions),
'last_performance_check': self.last_performance_check.isoformat(),
'signal_cooldowns': {
symbol: self.last_signal_check.get(symbol).isoformat()
for symbol in self.last_signal_check.keys()
if self.last_signal_check.get(symbol)
}
}
except Exception as e:
return {'error': str(e)}
|