Spaces:
Paused
Paused
| import asyncio | |
| import logging | |
| import time | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime, timedelta | |
| import yaml | |
| from core.exchange import BybitExchange | |
| from core.strategy import ScalpingStrategy | |
| from core.risk import RiskManager | |
| from core.data_engine import DataEngine | |
| from services.logger import log, log_trade, log_error, log_performance | |
| logger = logging.getLogger(__name__) | |
| class TradeMonitor: | |
| def __init__(self, exchange: BybitExchange, strategy: ScalpingStrategy, | |
| risk_manager: RiskManager, data_engine: DataEngine): | |
| self.exchange = exchange | |
| self.strategy = strategy | |
| self.risk_manager = risk_manager | |
| self.data_engine = data_engine | |
| self.settings = yaml.safe_load(open("config/settings.yaml")) | |
| self.pairs = yaml.safe_load(open("config/pairs.yaml"))["pairs"] | |
| self.is_running = False | |
| self.monitoring_interval = 2 | |
| self.last_signal_check = {} | |
| self.signal_cooldown = 30 | |
| self.performance_check_interval = 300 | |
| self.last_performance_check = datetime.now() | |
| async def start_monitoring(self): | |
| self.is_running = True | |
| log("π Starting trade monitoring loop") | |
| try: | |
| while self.is_running: | |
| await self._monitoring_cycle() | |
| await asyncio.sleep(self.monitoring_interval) | |
| except Exception as e: | |
| log_error("monitoring_loop", f"Monitoring loop crashed: {e}") | |
| self.is_running = False | |
| def stop_monitoring(self): | |
| self.is_running = False | |
| log("π Trade monitoring stopped") | |
| async def _monitoring_cycle(self): | |
| try: | |
| self.risk_manager.reset_daily_stats() | |
| await self._check_open_positions() | |
| await self._check_signals() | |
| await self._periodic_performance_check() | |
| self._health_check() | |
| except Exception as e: | |
| log_error("monitoring_cycle", f"Error in monitoring cycle: {e}") | |
| async def _check_open_positions(self): | |
| try: | |
| positions = self.exchange.get_positions() | |
| for position in positions: | |
| symbol = position.get("symbol") | |
| if not symbol: | |
| continue | |
| size = float(position.get("size", 0)) | |
| if abs(size) < 0.001: | |
| continue | |
| exit_reason = self.risk_manager.check_tp_sl_hit(symbol) | |
| if exit_reason: | |
| log(f"π― {exit_reason} hit for {symbol}") | |
| close_result = self.exchange.close_position(symbol) | |
| if close_result: | |
| self.risk_manager.close_position(symbol, exit_reason) | |
| exit_price = float(close_result.get("avgPrice", 0)) | |
| log_trade(symbol, "CLOSE", abs(size), exit_price, | |
| reason=exit_reason) | |
| self.last_signal_check[symbol] = datetime.now() - timedelta(seconds=self.signal_cooldown) | |
| else: | |
| log_error("position_close", f"Failed to close position for {symbol}") | |
| except Exception as e: | |
| log_error("position_check", f"Error checking positions: {e}") | |
| async def _check_signals(self): | |
| try: | |
| current_time = datetime.now() | |
| for symbol in self.pairs: | |
| last_check = self.last_signal_check.get(symbol) | |
| if last_check and (current_time - last_check).seconds < self.signal_cooldown: | |
| continue | |
| signal, confidence, price = self.strategy.generate_signal(symbol) | |
| if signal in ["BUY", "SELL"]: | |
| if self.risk_manager.validate_entry_signal(symbol, signal, confidence): | |
| await self._execute_signal(symbol, signal, confidence, price) | |
| self.last_signal_check[symbol] = current_time | |
| except Exception as e: | |
| log_error("signal_check", f"Error checking signals: {e}") | |
| async def _execute_signal(self, symbol: str, signal: str, confidence: float, price: float): | |
| try: | |
| log(f"π Executing {signal} signal for {symbol} at {price} (conf: {confidence:.2f})") | |
| qty = self.risk_manager.calculate_position_size(symbol, price, signal) | |
| if qty <= 0: | |
| log(f"β Invalid position size for {symbol}: {qty}") | |
| return | |
| self.exchange.set_leverage(symbol) | |
| order = self.exchange.market_order(symbol, signal, qty) | |
| if order: | |
| entry_price = float(order.get("avgPrice", price)) | |
| tp_percent = self.settings["trading"]["tp_percent"] | |
| sl_percent = self.settings["trading"]["sl_percent"] | |
| if signal == "BUY": | |
| tp_price = entry_price * (1 + tp_percent) | |
| sl_price = entry_price * (1 - sl_percent) | |
| else: | |
| tp_price = entry_price * (1 - tp_percent) | |
| sl_price = entry_price * (1 + sl_percent) | |
| tp_sl_result = self.exchange.set_tp_sl(symbol, signal, entry_price, tp_price, sl_price) | |
| if tp_sl_result: | |
| log(f"β TP/SL set for {symbol}: TP={tp_price:.4f}, SL={sl_price:.4f}") | |
| self.risk_manager.update_position(symbol, order) | |
| log_trade(symbol, signal, qty, entry_price, "MARKET") | |
| else: | |
| log_error("tp_sl_setup", f"Failed to set TP/SL for {symbol}") | |
| self.exchange.close_position(symbol) | |
| else: | |
| log_error("order_execution", f"Failed to execute order for {symbol}") | |
| except Exception as e: | |
| log_error("signal_execution", f"Error executing signal for {symbol}: {e}") | |
| async def _periodic_performance_check(self): | |
| try: | |
| current_time = datetime.now() | |
| if (current_time - self.last_performance_check).seconds >= self.performance_check_interval: | |
| risk_status = self.risk_manager.get_risk_status() | |
| from services.logger import logger_instance | |
| trade_stats = logger_instance.get_trade_statistics() | |
| performance_data = { | |
| **risk_status, | |
| **trade_stats | |
| } | |
| log_performance("ALL", "5min", performance_data) | |
| self.last_performance_check = current_time | |
| if risk_status.get('daily_pnl', 0) < -self.risk_manager.max_daily_loss: | |
| log("π¨ Daily loss limit reached - activating emergency stop") | |
| self.risk_manager.emergency_stop_all() | |
| except Exception as e: | |
| log_error("performance_check", f"Error in performance check: {e}") | |
| def _health_check(self): | |
| try: | |
| server_time = self.exchange.get_server_time() | |
| if not server_time: | |
| log_error("health_check", "Exchange connection unhealthy") | |
| buffer_status = self.data_engine.get_buffer_status() | |
| for symbol, buffers in buffer_status.get('price_buffers', {}).items(): | |
| if buffers < 10: | |
| log(f"β οΈ Low price buffer for {symbol}: {buffers} points") | |
| except Exception as e: | |
| log_error("health_check", f"Health check error: {e}") | |
| def get_monitoring_status(self) -> Dict[str, Any]: | |
| try: | |
| return { | |
| 'is_running': self.is_running, | |
| 'monitoring_interval': self.monitoring_interval, | |
| 'active_symbols': len(self.pairs), | |
| 'open_positions': len(self.risk_manager.open_positions), | |
| 'last_performance_check': self.last_performance_check.isoformat(), | |
| 'signal_cooldowns': { | |
| symbol: self.last_signal_check.get(symbol).isoformat() | |
| for symbol in self.last_signal_check.keys() | |
| if self.last_signal_check.get(symbol) | |
| } | |
| } | |
| except Exception as e: | |
| return {'error': str(e)} | |