Spaces:
Paused
Paused
File size: 11,397 Bytes
96e0cc2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
import logging
from typing import Dict, List, Optional, Tuple, Any
import yaml
import time
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class RiskManager:
def __init__(self, exchange_client):
self.exchange = exchange_client
self.settings = yaml.safe_load(open("config/settings.yaml"))
self.leverage = self.settings["trading"]["leverage"]
self.tp_percent = self.settings["trading"]["tp_percent"]
self.sl_percent = self.settings["trading"]["sl_percent"]
self.risk_per_trade = self.settings["risk"]["risk_per_trade"]
self.max_daily_loss = 100
self.daily_pnl = 0.0
self.daily_trades = 0
self.max_daily_trades = 50
self.max_open_positions = 1
self.min_order_size = 0.001
self.open_positions = {}
self.emergency_stop = False
self.last_reset_time = datetime.now()
self.trade_history = []
self.win_count = 0
self.loss_count = 0
def calculate_position_size(self, symbol: str, entry_price: float, side: str) -> float:
try:
balance_data = self.exchange.get_balance()
if not balance_data:
logger.error("Failed to get account balance")
return 0.0
usdt_balance = 0.0
for coin in balance_data:
if coin.get("coin") == "USDT":
usdt_balance = float(coin.get("walletBalance", 0))
break
if usdt_balance <= 0:
logger.warning("Insufficient USDT balance")
return 0.0
risk_amount = usdt_balance * self.risk_per_trade
sl_distance = entry_price * self.sl_percent
position_size = risk_amount / (sl_distance * self.leverage)
position_size = max(position_size, self.min_order_size)
if self.daily_pnl < -self.max_daily_loss:
logger.warning(f"Daily loss limit reached: {self.daily_pnl}")
return 0.0
if self.daily_trades >= self.max_daily_trades:
logger.warning("Max daily trades reached")
return 0.0
if symbol in self.open_positions:
logger.warning(f"Position already open for {symbol}")
return 0.0
position_size = self._validate_position_size(symbol, position_size, entry_price)
logger.info(f"Calculated position size: {position_size:.4f} for {symbol}")
return position_size
except Exception as e:
logger.error(f"Error calculating position size: {e}")
return 0.0
def _validate_position_size(self, symbol: str, size: float, price: float) -> float:
try:
min_size = 0.001
max_size = 100.0
validated_size = max(min_size, min(size, max_size))
if symbol.startswith('BTC'):
validated_size = round(validated_size, 3)
elif symbol.startswith('ETH'):
validated_size = round(validated_size, 2)
else:
validated_size = round(validated_size, 1)
return validated_size
except Exception as e:
logger.error(f"Error validating position size: {e}")
return 0.0
def validate_entry_signal(self, symbol: str, signal: str, confidence: float) -> bool:
try:
if self.emergency_stop:
logger.warning("Emergency stop activated")
return False
min_confidence = 0.6
if confidence < min_confidence:
logger.info(f"Signal confidence too low: {confidence}")
return False
if symbol in self.open_positions:
logger.warning(f"Position already exists for {symbol}")
return False
if not self._check_market_conditions(symbol):
return False
if self._is_high_volatility(symbol):
logger.warning(f"High volatility detected for {symbol}")
return False
return True
except Exception as e:
logger.error(f"Error validating entry signal: {e}")
return False
def _check_market_conditions(self, symbol: str) -> bool:
try:
ticker = self.exchange.get_ticker(symbol)
if not ticker:
return False
volume_24h = float(ticker.get("volume24h", 0))
if volume_24h < 100000:
logger.warning(f"Low volume for {symbol}: {volume_24h}")
return False
bid_price = float(ticker.get("bid1Price", 0))
ask_price = float(ticker.get("ask1Price", 0))
if bid_price == 0 or ask_price == 0:
return False
spread = (ask_price - bid_price) / bid_price
max_spread = 0.001
if spread > max_spread:
logger.warning(f"Spread too wide for {symbol}: {spread}")
return False
return True
except Exception as e:
logger.error(f"Error checking market conditions: {e}")
return False
def _is_high_volatility(self, symbol: str) -> bool:
try:
prices = self.exchange.get_kline_data(symbol, interval="1", limit=10)
if not prices or len(prices) < 5:
return True
high = max(float(candle[2]) for candle in prices)
low = min(float(candle[3]) for candle in prices)
current_price = float(prices[-1][4])
volatility = (high - low) / current_price
high_vol_threshold = 0.02
return volatility > high_vol_threshold
except Exception as e:
logger.error(f"Error checking volatility: {e}")
return True
def update_position(self, symbol: str, position_data: Dict[str, Any]):
try:
self.open_positions[symbol] = {
'entry_time': datetime.now(),
'entry_price': float(position_data.get('avgPrice', 0)),
'size': float(position_data.get('qty', 0)),
'side': position_data.get('side', ''),
'leverage': self.leverage
}
logger.info(f"Position updated for {symbol}: {self.open_positions[symbol]}")
except Exception as e:
logger.error(f"Error updating position for {symbol}: {e}")
def close_position(self, symbol: str, reason: str = "manual"):
try:
if symbol not in self.open_positions:
logger.warning(f"No position found for {symbol}")
return False
position = self.open_positions[symbol]
exit_price = self.exchange.get_ticker(symbol)
if exit_price:
exit_price = float(exit_price.get("lastPrice", 0))
entry_price = position['entry_price']
if position['side'] == 'Buy':
pnl = (exit_price - entry_price) / entry_price * position['size'] * self.leverage
else:
pnl = (entry_price - exit_price) / entry_price * position['size'] * self.leverage
self.daily_pnl += pnl
if pnl > 0:
self.win_count += 1
else:
self.loss_count += 1
self._record_trade(symbol, position, exit_price, pnl, reason)
del self.open_positions[symbol]
self.daily_trades += 1
logger.info(f"Position closed for {symbol}, reason: {reason}")
return True
except Exception as e:
logger.error(f"Error closing position for {symbol}: {e}")
return False
def _record_trade(self, symbol: str, position: Dict, exit_price: float, pnl: float, reason: str):
try:
trade = {
'symbol': symbol,
'entry_time': position['entry_time'],
'exit_time': datetime.now(),
'entry_price': position['entry_price'],
'exit_price': exit_price,
'size': position['size'],
'side': position['side'],
'pnl': pnl,
'reason': reason,
'leverage': position['leverage']
}
self.trade_history.append(trade)
if len(self.trade_history) > 1000:
self.trade_history = self.trade_history[-1000:]
except Exception as e:
logger.error(f"Error recording trade: {e}")
def check_tp_sl_hit(self, symbol: str) -> Optional[str]:
try:
if symbol not in self.open_positions:
return None
position = self.open_positions[symbol]
current_price = self.exchange.get_ticker(symbol)
if not current_price:
return None
current_price = float(current_price.get("lastPrice", 0))
entry_price = position['entry_price']
if position['side'] == 'Buy':
tp_price = entry_price * (1 + self.tp_percent)
sl_price = entry_price * (1 - self.sl_percent)
if current_price >= tp_price:
return "TP"
elif current_price <= sl_price:
return "SL"
else:
tp_price = entry_price * (1 - self.tp_percent)
sl_price = entry_price * (1 + self.sl_percent)
if current_price <= tp_price:
return "TP"
elif current_price >= sl_price:
return "SL"
return None
except Exception as e:
logger.error(f"Error checking TP/SL for {symbol}: {e}")
return None
def emergency_stop_all(self):
try:
self.emergency_stop = True
for symbol in list(self.open_positions.keys()):
self.exchange.close_position(symbol)
self.close_position(symbol, "emergency_stop")
logger.critical("Emergency stop activated - all positions closed")
except Exception as e:
logger.error(f"Error in emergency stop: {e}")
def reset_daily_stats(self):
try:
now = datetime.now()
if now.date() > self.last_reset_time.date():
self.daily_pnl = 0.0
self.daily_trades = 0
self.last_reset_time = now
logger.info("Daily statistics reset")
except Exception as e:
logger.error(f"Error resetting daily stats: {e}")
def get_risk_status(self) -> Dict[str, Any]:
try:
total_trades = self.win_count + self.loss_count
win_rate = self.win_count / total_trades if total_trades > 0 else 0.0
status = {
'emergency_stop': self.emergency_stop,
'open_positions': len(self.open_positions),
'daily_pnl': self.daily_pnl,
'daily_trades': self.daily_trades,
'win_rate': win_rate,
'total_trades': total_trades,
'positions': list(self.open_positions.keys())
}
return status
except Exception as e:
logger.error(f"Error getting risk status: {e}")
return {'error': str(e)}
|