scalperBot / api_server.py
nexusbert's picture
Upload 47 files
5116a2e verified
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
import uvicorn
import asyncio
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import yaml
import os
from dotenv import load_dotenv
from core.exchange import BybitExchange
from core.strategy import ScalpingStrategy
from core.risk import RiskManager
from core.data_engine import DataEngine
from core.trade_monitor import TradeMonitor
from core.websockets import BybitWebSocket
from services.logger import log, log_trade, log_performance
from services.telegram import send_telegram
from services.notifications import send_hf_notification, get_hf_notifications
load_dotenv()
app = FastAPI(title="Bybit Scalping Bot API", version="1.0.0")
class TradeSession(BaseModel):
symbol: str
duration_hours: int = 18
max_trades: Optional[int] = None
class BotStatus(BaseModel):
is_running: bool
active_sessions: List[Dict]
total_pnl: float
trades_today: int
uptime: str
# Global bot state
bot_instances = {}
active_sessions = {}
session_reports = {}
def load_config():
settings = yaml.safe_load(open("config/settings.yaml"))
pairs = yaml.safe_load(open("config/pairs.yaml"))["pairs"]
return settings, pairs
def create_bot_instance(symbol: str):
"""Create a dedicated bot instance for a symbol"""
settings, pairs = load_config()
exchange = BybitExchange()
data_engine = DataEngine()
strategy = ScalpingStrategy(data_engine)
risk_manager = RiskManager(exchange)
monitor = TradeMonitor(exchange, strategy, risk_manager, data_engine)
ws_client = BybitWebSocket(callback=lambda data: handle_ws_data(data, symbol))
return {
'exchange': exchange,
'data_engine': data_engine,
'strategy': strategy,
'risk_manager': risk_manager,
'monitor': monitor,
'ws_client': ws_client,
'symbol': symbol,
'start_time': datetime.now(),
'trades': [],
'pnl': 0.0
}
def handle_ws_data(data, symbol):
"""Handle WebSocket data for specific symbol"""
if symbol in bot_instances:
bot = bot_instances[symbol]
if "topic" in data:
topic = data["topic"]
payload = data["data"]
if topic.startswith("tickers."):
ticker_symbol = topic.split(".")[1]
if ticker_symbol == symbol:
bot['data_engine'].update_price(symbol, float(payload["lastPrice"]))
elif topic.startswith("kline."):
parts = topic.split(".")
interval = parts[1]
ticker_symbol = parts[2]
if ticker_symbol == symbol:
for candle in payload:
candle_data = {
'timestamp': candle['start'],
'open': float(candle['open']),
'high': float(candle['high']),
'low': float(candle['low']),
'close': float(candle['close']),
'volume': float(candle['volume'])
}
bot['data_engine'].update_candle(symbol, interval, candle_data)
async def run_trading_session(symbol: str, duration_hours: int, max_trades: Optional[int] = None):
"""Run a trading session for a specific symbol"""
session_id = f"{symbol}_{int(time.time())}"
log(f"πŸš€ Starting trading session for {symbol} (Duration: {duration_hours}h)")
if symbol not in bot_instances:
bot_instances[symbol] = create_bot_instance(symbol)
bot = bot_instances[symbol]
active_sessions[session_id] = {
'symbol': symbol,
'start_time': datetime.now(),
'end_time': datetime.now() + timedelta(hours=duration_hours),
'duration_hours': duration_hours,
'max_trades': max_trades,
'trades_executed': 0,
'pnl': 0.0,
'status': 'running'
}
try:
# Start WebSocket
await bot['ws_client'].connect()
await asyncio.sleep(2)
await bot['ws_client'].subscribe_ticker([symbol])
await bot['ws_client'].subscribe_kline([symbol], ["1", "5"])
await bot['ws_client'].subscribe_orderbook([symbol], depth=25)
await bot['ws_client'].subscribe_trades([symbol])
# Start monitoring
monitor_task = asyncio.create_task(bot['monitor'].start_monitoring())
# Run for specified duration
end_time = datetime.now() + timedelta(hours=duration_hours)
trades_count = 0
while datetime.now() < end_time:
await asyncio.sleep(1)
# Check trade limits
if max_trades and trades_count >= max_trades:
log(f"🎯 Max trades reached for {symbol} session")
break
# Update session stats
session = active_sessions[session_id]
session['pnl'] = bot['risk_manager'].daily_pnl
session['trades_executed'] = len(bot['risk_manager'].trade_history)
# Stop monitoring
bot['monitor'].stop_monitoring()
await bot['ws_client'].close()
# Generate report
await generate_session_report(session_id)
except Exception as e:
log(f"❌ Error in trading session for {symbol}: {e}")
active_sessions[session_id]['status'] = 'error'
async def generate_session_report(session_id: str):
"""Generate and send trading session report"""
if session_id not in active_sessions:
return
session = active_sessions[session_id]
symbol = session['symbol']
if symbol in bot_instances:
bot = bot_instances[symbol]
risk_manager = bot['risk_manager']
# Calculate session metrics
total_trades = len(risk_manager.trade_history)
winning_trades = sum(1 for trade in risk_manager.trade_history if trade['pnl'] > 0)
losing_trades = total_trades - winning_trades
win_rate = winning_trades / total_trades if total_trades > 0 else 0
total_pnl = sum(trade['pnl'] for trade in risk_manager.trade_history)
# Generate report
report = f"""
πŸ† TRADING SESSION REPORT - {symbol}
═══════════════════════════════════════════
πŸ“Š Session Details:
β€’ Duration: {session['duration_hours']} hours
β€’ Start Time: {session['start_time'].strftime('%Y-%m-%d %H:%M:%S')}
β€’ End Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
πŸ“ˆ Performance Metrics:
β€’ Total Trades: {total_trades}
β€’ Winning Trades: {winning_trades}
β€’ Losing Trades: {losing_trades}
β€’ Win Rate: {win_rate:.1%}
β€’ Total P&L: ${total_pnl:.2f}
🎯 Risk Management:
β€’ Max Loss Limit: ${risk_manager.max_daily_loss}
β€’ Risk per Trade: {risk_manager.risk_per_trade*100}%
β€’ Leverage Used: {risk_manager.leverage}x
πŸ’Ž Trade Summary:
"""
# Add individual trade details
for i, trade in enumerate(risk_manager.trade_history[-10:], 1): # Last 10 trades
pnl_emoji = "🟒" if trade['pnl'] > 0 else "πŸ”΄"
report += f"{i}. {trade['side']} @ ${trade['entry_price']:.2f} β†’ ${trade['exit_price']:.2f} {pnl_emoji} P&L: ${trade['pnl']:.2f}\n"
report += f"\n═══════════════════════════════════════════"
# Send to Telegram
send_telegram(report)
# Save report
session_reports[session_id] = {
'session': session,
'metrics': {
'total_trades': total_trades,
'win_rate': win_rate,
'total_pnl': total_pnl,
'winning_trades': winning_trades,
'losing_trades': losing_trades
},
'trades': risk_manager.trade_history,
'report': report
}
log(f"πŸ“Š Session report generated for {symbol}")
# Mark session as completed
session['status'] = 'completed'
session['end_time'] = datetime.now()
@app.get("/")
async def root():
"""API root endpoint"""
return {"message": "Bybit Scalping Bot API", "version": "1.0.0", "status": "running"}
@app.get("/status")
async def get_status():
"""Get bot status"""
total_pnl = 0
total_trades = 0
for session in active_sessions.values():
if session['status'] == 'running':
total_pnl += session.get('pnl', 0)
total_trades += session.get('trades_executed', 0)
return BotStatus(
is_running=bool(active_sessions),
active_sessions=[
{
'session_id': sid,
'symbol': session['symbol'],
'status': session['status'],
'start_time': session['start_time'].isoformat(),
'pnl': session.get('pnl', 0),
'trades': session.get('trades_executed', 0)
}
for sid, session in active_sessions.items()
],
total_pnl=total_pnl,
trades_today=total_trades,
uptime=str(datetime.now() - datetime.fromtimestamp(time.time()))
)
@app.post("/start/{symbol}")
async def start_symbol_trading(symbol: str, background_tasks: BackgroundTasks, duration_hours: int = 18, max_trades: Optional[int] = None):
"""Start trading for a specific symbol"""
settings, configured_pairs = load_config()
if symbol not in configured_pairs:
raise HTTPException(status_code=400, detail=f"Symbol {symbol} not configured. Available: {configured_pairs}")
# Check if already running
for session in active_sessions.values():
if session['symbol'] == symbol and session['status'] == 'running':
raise HTTPException(status_code=400, detail=f"Trading already active for {symbol}")
# Start trading session
background_tasks.add_task(run_trading_session, symbol, duration_hours, max_trades)
message = f"πŸš€ Started trading session for {symbol} ({duration_hours}h duration)"
send_telegram(message)
send_hf_notification(message, "session_start")
log(message)
return {"message": message, "symbol": symbol, "duration_hours": duration_hours}
@app.post("/stop/{symbol}")
async def stop_symbol_trading(symbol: str):
"""Stop trading for a specific symbol"""
stopped_sessions = []
for session_id, session in active_sessions.items():
if session['symbol'] == symbol and session['status'] == 'running':
session['status'] = 'stopped'
session['end_time'] = datetime.now()
stopped_sessions.append(session_id)
if symbol in bot_instances:
bot = bot_instances[symbol]
bot['monitor'].stop_monitoring()
await bot['ws_client'].close()
if stopped_sessions:
message = f"πŸ›‘ Stopped trading for {symbol}"
send_telegram(message)
send_hf_notification(message, "session_stop")
log(message)
return {"message": message, "stopped_sessions": stopped_sessions}
else:
raise HTTPException(status_code=404, detail=f"No active trading session found for {symbol}")
@app.get("/sessions")
async def get_sessions():
"""Get all trading sessions"""
return {
"active_sessions": active_sessions,
"completed_sessions": session_reports
}
@app.get("/report/{session_id}")
async def get_session_report(session_id: str):
"""Get detailed report for a specific session"""
if session_id in session_reports:
return session_reports[session_id]
else:
raise HTTPException(status_code=404, detail="Session report not found")
@app.get("/logs/analysis")
async def get_analysis_logs(lines: int = 50):
"""Get recent analysis logs"""
try:
log_file = "logs/scalper.log"
if os.path.exists(log_file):
with open(log_file, 'r') as f:
all_lines = f.readlines()
# Filter for analysis-related logs
analysis_lines = [line for line in all_lines if any(keyword in line for keyword in [
'SIGNAL', 'EMA', 'RSI', 'volume', 'orderbook', 'analysis', 'strategy'
])]
recent_lines = analysis_lines[-lines:]
return {
"logs": recent_lines,
"count": len(recent_lines),
"total_analysis_logs": len(analysis_lines)
}
else:
return {"error": "Log file not found", "logs": [], "count": 0}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error reading logs: {e}")
@app.get("/logs/live")
async def get_live_logs(lines: int = 20):
"""Get recent live logs (streaming)"""
try:
log_file = "logs/scalper.log"
if os.path.exists(log_file):
with open(log_file, 'r') as f:
all_lines = f.readlines()
recent_lines = all_lines[-lines:]
return {
"logs": recent_lines,
"count": len(recent_lines),
"latest_timestamp": recent_lines[-1].split(' - ')[0] if recent_lines else None
}
else:
return {"error": "Log file not found", "logs": [], "count": 0}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error reading logs: {e}")
@app.post("/start_all")
async def start_all_pairs(background_tasks: BackgroundTasks, duration_hours: int = 18, max_trades: Optional[int] = None):
"""Start trading sessions for all configured pairs"""
settings, configured_pairs = load_config()
started_sessions = []
failed_sessions = []
for symbol in configured_pairs:
# Check if already running
already_running = any(session['symbol'] == symbol and session['status'] == 'running'
for session in active_sessions.values())
if already_running:
failed_sessions.append({"symbol": symbol, "reason": "Already running"})
continue
try:
# Start trading session
background_tasks.add_task(run_trading_session, symbol, duration_hours, max_trades)
started_sessions.append({"symbol": symbol, "duration_hours": duration_hours})
except Exception as e:
failed_sessions.append({"symbol": symbol, "reason": str(e)})
message = f"πŸš€ Started trading sessions for {len(started_sessions)} pairs"
if started_sessions:
send_telegram(message)
send_hf_notification(message, "bulk_start")
log(message)
return {
"message": message,
"started_sessions": started_sessions,
"failed_sessions": failed_sessions,
"total_started": len(started_sessions),
"total_failed": len(failed_sessions)
}
@app.post("/stop_all")
async def stop_all_pairs():
"""Stop trading sessions for all pairs"""
stopped_sessions = []
for session_id, session in list(active_sessions.items()):
if session['status'] == 'running':
session['status'] = 'stopped'
session['end_time'] = datetime.now()
stopped_sessions.append(session_id)
# Stop the actual trading
if session['symbol'] in bot_instances:
bot = bot_instances[session['symbol']]
bot['monitor'].stop_monitoring()
await bot['ws_client'].close()
if stopped_sessions:
message = f"πŸ›‘ Stopped all trading sessions ({len(stopped_sessions)} sessions)"
send_telegram(message)
send_hf_notification(message, "bulk_stop")
log(message)
return {"message": message, "stopped_sessions": stopped_sessions}
else:
return {"message": "No active sessions to stop", "stopped_sessions": []}
@app.get("/analysis/status")
async def get_analysis_status():
"""Get current analysis status for all active sessions"""
analysis_status = {}
for session_id, session in active_sessions.items():
if session['status'] == 'running' and session['symbol'] in bot_instances:
bot = bot_instances[session['symbol']]
symbol = session['symbol']
# Get current market data
try:
current_price = bot['data_engine'].get_prices(symbol, limit=1)
current_price = current_price[-1] if current_price else None
except:
current_price = None
# Get technical indicators
try:
ema_fast = bot['data_engine'].calculate_ema(symbol, "1", 9)
ema_slow = bot['data_engine'].calculate_ema(symbol, "1", 21)
rsi = bot['data_engine'].calculate_rsi(symbol, "1", 14)
volume_spike = bot['data_engine'].detect_volume_spike(symbol, "1", 1.3)
orderbook_imbalance = bot['data_engine'].get_orderbook_imbalance(symbol)
except:
ema_fast = ema_slow = rsi = volume_spike = orderbook_imbalance = None
analysis_status[symbol] = {
"session_id": session_id,
"current_price": current_price,
"indicators": {
"ema_9": ema_fast,
"ema_21": ema_slow,
"rsi_14": rsi,
"volume_spike": volume_spike,
"orderbook_imbalance": orderbook_imbalance
},
"strategy_conditions": {
"trend_up": ema_fast > ema_slow if ema_fast and ema_slow else None,
"rsi_valid": 40 <= rsi <= 70 if rsi else None,
"volume_confirmed": volume_spike,
"orderbook_aligned": abs(orderbook_imbalance or 0) > 0.15
},
"last_update": datetime.now().isoformat()
}
return {
"analysis_status": analysis_status,
"active_sessions": len(analysis_status),
"last_updated": datetime.now().isoformat()
}
@app.get("/notifications")
async def get_notifications(limit: int = 20):
"""Get recent notifications (works in HF Spaces)"""
try:
notifications = get_hf_notifications(limit)
return {
"notifications": notifications,
"count": len(notifications),
"limit_requested": limit
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get notifications: {e}")
@app.post("/emergency_stop")
async def emergency_stop():
"""Emergency stop all trading"""
stopped_sessions = []
for session_id, session in active_sessions.items():
if session['status'] == 'running':
session['status'] = 'emergency_stop'
session['end_time'] = datetime.now()
stopped_sessions.append(session_id)
# Stop all monitors
for bot in bot_instances.values():
bot['monitor'].stop_monitoring()
await bot['ws_client'].close()
message = f"🚨 EMERGENCY STOP activated - All trading halted"
send_telegram(message)
send_hf_notification(message, "emergency")
log(message)
return {"message": message, "stopped_sessions": stopped_sessions}
if __name__ == "__main__":
log("πŸš€ Starting FastAPI server for Bybit Scalping Bot")
uvicorn.run(app, host="0.0.0.0", port=8000)