scalperBot / core /risk.py
nexusbert's picture
Upload 36 files
96e0cc2 verified
import logging
from typing import Dict, List, Optional, Tuple, Any
import yaml
import time
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class RiskManager:
def __init__(self, exchange_client):
self.exchange = exchange_client
self.settings = yaml.safe_load(open("config/settings.yaml"))
self.leverage = self.settings["trading"]["leverage"]
self.tp_percent = self.settings["trading"]["tp_percent"]
self.sl_percent = self.settings["trading"]["sl_percent"]
self.risk_per_trade = self.settings["risk"]["risk_per_trade"]
self.max_daily_loss = 100
self.daily_pnl = 0.0
self.daily_trades = 0
self.max_daily_trades = 50
self.max_open_positions = 1
self.min_order_size = 0.001
self.open_positions = {}
self.emergency_stop = False
self.last_reset_time = datetime.now()
self.trade_history = []
self.win_count = 0
self.loss_count = 0
def calculate_position_size(self, symbol: str, entry_price: float, side: str) -> float:
try:
balance_data = self.exchange.get_balance()
if not balance_data:
logger.error("Failed to get account balance")
return 0.0
usdt_balance = 0.0
for coin in balance_data:
if coin.get("coin") == "USDT":
usdt_balance = float(coin.get("walletBalance", 0))
break
if usdt_balance <= 0:
logger.warning("Insufficient USDT balance")
return 0.0
risk_amount = usdt_balance * self.risk_per_trade
sl_distance = entry_price * self.sl_percent
position_size = risk_amount / (sl_distance * self.leverage)
position_size = max(position_size, self.min_order_size)
if self.daily_pnl < -self.max_daily_loss:
logger.warning(f"Daily loss limit reached: {self.daily_pnl}")
return 0.0
if self.daily_trades >= self.max_daily_trades:
logger.warning("Max daily trades reached")
return 0.0
if symbol in self.open_positions:
logger.warning(f"Position already open for {symbol}")
return 0.0
position_size = self._validate_position_size(symbol, position_size, entry_price)
logger.info(f"Calculated position size: {position_size:.4f} for {symbol}")
return position_size
except Exception as e:
logger.error(f"Error calculating position size: {e}")
return 0.0
def _validate_position_size(self, symbol: str, size: float, price: float) -> float:
try:
min_size = 0.001
max_size = 100.0
validated_size = max(min_size, min(size, max_size))
if symbol.startswith('BTC'):
validated_size = round(validated_size, 3)
elif symbol.startswith('ETH'):
validated_size = round(validated_size, 2)
else:
validated_size = round(validated_size, 1)
return validated_size
except Exception as e:
logger.error(f"Error validating position size: {e}")
return 0.0
def validate_entry_signal(self, symbol: str, signal: str, confidence: float) -> bool:
try:
if self.emergency_stop:
logger.warning("Emergency stop activated")
return False
min_confidence = 0.6
if confidence < min_confidence:
logger.info(f"Signal confidence too low: {confidence}")
return False
if symbol in self.open_positions:
logger.warning(f"Position already exists for {symbol}")
return False
if not self._check_market_conditions(symbol):
return False
if self._is_high_volatility(symbol):
logger.warning(f"High volatility detected for {symbol}")
return False
return True
except Exception as e:
logger.error(f"Error validating entry signal: {e}")
return False
def _check_market_conditions(self, symbol: str) -> bool:
try:
ticker = self.exchange.get_ticker(symbol)
if not ticker:
return False
volume_24h = float(ticker.get("volume24h", 0))
if volume_24h < 100000:
logger.warning(f"Low volume for {symbol}: {volume_24h}")
return False
bid_price = float(ticker.get("bid1Price", 0))
ask_price = float(ticker.get("ask1Price", 0))
if bid_price == 0 or ask_price == 0:
return False
spread = (ask_price - bid_price) / bid_price
max_spread = 0.001
if spread > max_spread:
logger.warning(f"Spread too wide for {symbol}: {spread}")
return False
return True
except Exception as e:
logger.error(f"Error checking market conditions: {e}")
return False
def _is_high_volatility(self, symbol: str) -> bool:
try:
prices = self.exchange.get_kline_data(symbol, interval="1", limit=10)
if not prices or len(prices) < 5:
return True
high = max(float(candle[2]) for candle in prices)
low = min(float(candle[3]) for candle in prices)
current_price = float(prices[-1][4])
volatility = (high - low) / current_price
high_vol_threshold = 0.02
return volatility > high_vol_threshold
except Exception as e:
logger.error(f"Error checking volatility: {e}")
return True
def update_position(self, symbol: str, position_data: Dict[str, Any]):
try:
self.open_positions[symbol] = {
'entry_time': datetime.now(),
'entry_price': float(position_data.get('avgPrice', 0)),
'size': float(position_data.get('qty', 0)),
'side': position_data.get('side', ''),
'leverage': self.leverage
}
logger.info(f"Position updated for {symbol}: {self.open_positions[symbol]}")
except Exception as e:
logger.error(f"Error updating position for {symbol}: {e}")
def close_position(self, symbol: str, reason: str = "manual"):
try:
if symbol not in self.open_positions:
logger.warning(f"No position found for {symbol}")
return False
position = self.open_positions[symbol]
exit_price = self.exchange.get_ticker(symbol)
if exit_price:
exit_price = float(exit_price.get("lastPrice", 0))
entry_price = position['entry_price']
if position['side'] == 'Buy':
pnl = (exit_price - entry_price) / entry_price * position['size'] * self.leverage
else:
pnl = (entry_price - exit_price) / entry_price * position['size'] * self.leverage
self.daily_pnl += pnl
if pnl > 0:
self.win_count += 1
else:
self.loss_count += 1
self._record_trade(symbol, position, exit_price, pnl, reason)
del self.open_positions[symbol]
self.daily_trades += 1
logger.info(f"Position closed for {symbol}, reason: {reason}")
return True
except Exception as e:
logger.error(f"Error closing position for {symbol}: {e}")
return False
def _record_trade(self, symbol: str, position: Dict, exit_price: float, pnl: float, reason: str):
try:
trade = {
'symbol': symbol,
'entry_time': position['entry_time'],
'exit_time': datetime.now(),
'entry_price': position['entry_price'],
'exit_price': exit_price,
'size': position['size'],
'side': position['side'],
'pnl': pnl,
'reason': reason,
'leverage': position['leverage']
}
self.trade_history.append(trade)
if len(self.trade_history) > 1000:
self.trade_history = self.trade_history[-1000:]
except Exception as e:
logger.error(f"Error recording trade: {e}")
def check_tp_sl_hit(self, symbol: str) -> Optional[str]:
try:
if symbol not in self.open_positions:
return None
position = self.open_positions[symbol]
current_price = self.exchange.get_ticker(symbol)
if not current_price:
return None
current_price = float(current_price.get("lastPrice", 0))
entry_price = position['entry_price']
if position['side'] == 'Buy':
tp_price = entry_price * (1 + self.tp_percent)
sl_price = entry_price * (1 - self.sl_percent)
if current_price >= tp_price:
return "TP"
elif current_price <= sl_price:
return "SL"
else:
tp_price = entry_price * (1 - self.tp_percent)
sl_price = entry_price * (1 + self.sl_percent)
if current_price <= tp_price:
return "TP"
elif current_price >= sl_price:
return "SL"
return None
except Exception as e:
logger.error(f"Error checking TP/SL for {symbol}: {e}")
return None
def emergency_stop_all(self):
try:
self.emergency_stop = True
for symbol in list(self.open_positions.keys()):
self.exchange.close_position(symbol)
self.close_position(symbol, "emergency_stop")
logger.critical("Emergency stop activated - all positions closed")
except Exception as e:
logger.error(f"Error in emergency stop: {e}")
def reset_daily_stats(self):
try:
now = datetime.now()
if now.date() > self.last_reset_time.date():
self.daily_pnl = 0.0
self.daily_trades = 0
self.last_reset_time = now
logger.info("Daily statistics reset")
except Exception as e:
logger.error(f"Error resetting daily stats: {e}")
def get_risk_status(self) -> Dict[str, Any]:
try:
total_trades = self.win_count + self.loss_count
win_rate = self.win_count / total_trades if total_trades > 0 else 0.0
status = {
'emergency_stop': self.emergency_stop,
'open_positions': len(self.open_positions),
'daily_pnl': self.daily_pnl,
'daily_trades': self.daily_trades,
'win_rate': win_rate,
'total_trades': total_trades,
'positions': list(self.open_positions.keys())
}
return status
except Exception as e:
logger.error(f"Error getting risk status: {e}")
return {'error': str(e)}