from fastapi import FastAPI, BackgroundTasks, HTTPException from pydantic import BaseModel import uvicorn import asyncio import threading import time from datetime import datetime, timedelta from typing import Dict, List, Optional import yaml import os from dotenv import load_dotenv from core.exchange import BybitExchange from core.strategy import ScalpingStrategy from core.risk import RiskManager from core.data_engine import DataEngine from core.trade_monitor import TradeMonitor from core.websockets import BybitWebSocket from services.logger import log, log_trade, log_performance from services.telegram import send_telegram from services.notifications import send_hf_notification, get_hf_notifications load_dotenv() app = FastAPI(title="Bybit Scalping Bot API", version="1.0.0") class TradeSession(BaseModel): symbol: str duration_hours: int = 18 max_trades: Optional[int] = None class BotStatus(BaseModel): is_running: bool active_sessions: List[Dict] total_pnl: float trades_today: int uptime: str # Global bot state bot_instances = {} active_sessions = {} session_reports = {} def load_config(): settings = yaml.safe_load(open("config/settings.yaml")) pairs = yaml.safe_load(open("config/pairs.yaml"))["pairs"] return settings, pairs def create_bot_instance(symbol: str): """Create a dedicated bot instance for a symbol""" settings, pairs = load_config() exchange = BybitExchange() data_engine = DataEngine() strategy = ScalpingStrategy(data_engine) risk_manager = RiskManager(exchange) monitor = TradeMonitor(exchange, strategy, risk_manager, data_engine) ws_client = BybitWebSocket(callback=lambda data: handle_ws_data(data, symbol)) return { 'exchange': exchange, 'data_engine': data_engine, 'strategy': strategy, 'risk_manager': risk_manager, 'monitor': monitor, 'ws_client': ws_client, 'symbol': symbol, 'start_time': datetime.now(), 'trades': [], 'pnl': 0.0 } def handle_ws_data(data, symbol): """Handle WebSocket data for specific symbol""" if symbol in bot_instances: bot = bot_instances[symbol] if "topic" in data: topic = data["topic"] payload = data["data"] if topic.startswith("tickers."): ticker_symbol = topic.split(".")[1] if ticker_symbol == symbol: bot['data_engine'].update_price(symbol, float(payload["lastPrice"])) elif topic.startswith("kline."): parts = topic.split(".") interval = parts[1] ticker_symbol = parts[2] if ticker_symbol == symbol: for candle in payload: candle_data = { 'timestamp': candle['start'], 'open': float(candle['open']), 'high': float(candle['high']), 'low': float(candle['low']), 'close': float(candle['close']), 'volume': float(candle['volume']) } bot['data_engine'].update_candle(symbol, interval, candle_data) async def run_trading_session(symbol: str, duration_hours: int, max_trades: Optional[int] = None): """Run a trading session for a specific symbol""" session_id = f"{symbol}_{int(time.time())}" log(f"šŸš€ Starting trading session for {symbol} (Duration: {duration_hours}h)") if symbol not in bot_instances: bot_instances[symbol] = create_bot_instance(symbol) bot = bot_instances[symbol] active_sessions[session_id] = { 'symbol': symbol, 'start_time': datetime.now(), 'end_time': datetime.now() + timedelta(hours=duration_hours), 'duration_hours': duration_hours, 'max_trades': max_trades, 'trades_executed': 0, 'pnl': 0.0, 'status': 'running' } try: # Start WebSocket await bot['ws_client'].connect() await asyncio.sleep(2) await bot['ws_client'].subscribe_ticker([symbol]) await bot['ws_client'].subscribe_kline([symbol], ["1", "5"]) await bot['ws_client'].subscribe_orderbook([symbol], depth=25) await bot['ws_client'].subscribe_trades([symbol]) # Start monitoring monitor_task = asyncio.create_task(bot['monitor'].start_monitoring()) # Run for specified duration end_time = datetime.now() + timedelta(hours=duration_hours) trades_count = 0 while datetime.now() < end_time: await asyncio.sleep(1) # Check trade limits if max_trades and trades_count >= max_trades: log(f"šŸŽÆ Max trades reached for {symbol} session") break # Update session stats session = active_sessions[session_id] session['pnl'] = bot['risk_manager'].daily_pnl session['trades_executed'] = len(bot['risk_manager'].trade_history) # Stop monitoring bot['monitor'].stop_monitoring() await bot['ws_client'].close() # Generate report await generate_session_report(session_id) except Exception as e: log(f"āŒ Error in trading session for {symbol}: {e}") active_sessions[session_id]['status'] = 'error' async def generate_session_report(session_id: str): """Generate and send trading session report""" if session_id not in active_sessions: return session = active_sessions[session_id] symbol = session['symbol'] if symbol in bot_instances: bot = bot_instances[symbol] risk_manager = bot['risk_manager'] # Calculate session metrics total_trades = len(risk_manager.trade_history) winning_trades = sum(1 for trade in risk_manager.trade_history if trade['pnl'] > 0) losing_trades = total_trades - winning_trades win_rate = winning_trades / total_trades if total_trades > 0 else 0 total_pnl = sum(trade['pnl'] for trade in risk_manager.trade_history) # Generate report report = f""" šŸ† TRADING SESSION REPORT - {symbol} ═══════════════════════════════════════════ šŸ“Š Session Details: • Duration: {session['duration_hours']} hours • Start Time: {session['start_time'].strftime('%Y-%m-%d %H:%M:%S')} • End Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} šŸ“ˆ Performance Metrics: • Total Trades: {total_trades} • Winning Trades: {winning_trades} • Losing Trades: {losing_trades} • Win Rate: {win_rate:.1%} • Total P&L: ${total_pnl:.2f} šŸŽÆ Risk Management: • Max Loss Limit: ${risk_manager.max_daily_loss} • Risk per Trade: {risk_manager.risk_per_trade*100}% • Leverage Used: {risk_manager.leverage}x šŸ’Ž Trade Summary: """ # Add individual trade details for i, trade in enumerate(risk_manager.trade_history[-10:], 1): # Last 10 trades pnl_emoji = "🟢" if trade['pnl'] > 0 else "šŸ”“" report += f"{i}. {trade['side']} @ ${trade['entry_price']:.2f} → ${trade['exit_price']:.2f} {pnl_emoji} P&L: ${trade['pnl']:.2f}\n" report += f"\n═══════════════════════════════════════════" # Send to Telegram send_telegram(report) # Save report session_reports[session_id] = { 'session': session, 'metrics': { 'total_trades': total_trades, 'win_rate': win_rate, 'total_pnl': total_pnl, 'winning_trades': winning_trades, 'losing_trades': losing_trades }, 'trades': risk_manager.trade_history, 'report': report } log(f"šŸ“Š Session report generated for {symbol}") # Mark session as completed session['status'] = 'completed' session['end_time'] = datetime.now() @app.get("/") async def root(): """API root endpoint""" return {"message": "Bybit Scalping Bot API", "version": "1.0.0", "status": "running"} @app.get("/status") async def get_status(): """Get bot status""" total_pnl = 0 total_trades = 0 for session in active_sessions.values(): if session['status'] == 'running': total_pnl += session.get('pnl', 0) total_trades += session.get('trades_executed', 0) return BotStatus( is_running=bool(active_sessions), active_sessions=[ { 'session_id': sid, 'symbol': session['symbol'], 'status': session['status'], 'start_time': session['start_time'].isoformat(), 'pnl': session.get('pnl', 0), 'trades': session.get('trades_executed', 0) } for sid, session in active_sessions.items() ], total_pnl=total_pnl, trades_today=total_trades, uptime=str(datetime.now() - datetime.fromtimestamp(time.time())) ) @app.post("/start/{symbol}") async def start_symbol_trading(symbol: str, background_tasks: BackgroundTasks, duration_hours: int = 18, max_trades: Optional[int] = None): """Start trading for a specific symbol""" settings, configured_pairs = load_config() if symbol not in configured_pairs: raise HTTPException(status_code=400, detail=f"Symbol {symbol} not configured. Available: {configured_pairs}") # Check if already running for session in active_sessions.values(): if session['symbol'] == symbol and session['status'] == 'running': raise HTTPException(status_code=400, detail=f"Trading already active for {symbol}") # Start trading session background_tasks.add_task(run_trading_session, symbol, duration_hours, max_trades) message = f"šŸš€ Started trading session for {symbol} ({duration_hours}h duration)" send_telegram(message) send_hf_notification(message, "session_start") log(message) return {"message": message, "symbol": symbol, "duration_hours": duration_hours} @app.post("/stop/{symbol}") async def stop_symbol_trading(symbol: str): """Stop trading for a specific symbol""" stopped_sessions = [] for session_id, session in active_sessions.items(): if session['symbol'] == symbol and session['status'] == 'running': session['status'] = 'stopped' session['end_time'] = datetime.now() stopped_sessions.append(session_id) if symbol in bot_instances: bot = bot_instances[symbol] bot['monitor'].stop_monitoring() await bot['ws_client'].close() if stopped_sessions: message = f"šŸ›‘ Stopped trading for {symbol}" send_telegram(message) send_hf_notification(message, "session_stop") log(message) return {"message": message, "stopped_sessions": stopped_sessions} else: raise HTTPException(status_code=404, detail=f"No active trading session found for {symbol}") @app.get("/sessions") async def get_sessions(): """Get all trading sessions""" return { "active_sessions": active_sessions, "completed_sessions": session_reports } @app.get("/report/{session_id}") async def get_session_report(session_id: str): """Get detailed report for a specific session""" if session_id in session_reports: return session_reports[session_id] else: raise HTTPException(status_code=404, detail="Session report not found") @app.get("/logs/analysis") async def get_analysis_logs(lines: int = 50): """Get recent analysis logs""" try: log_file = "logs/scalper.log" if os.path.exists(log_file): with open(log_file, 'r') as f: all_lines = f.readlines() # Filter for analysis-related logs analysis_lines = [line for line in all_lines if any(keyword in line for keyword in [ 'SIGNAL', 'EMA', 'RSI', 'volume', 'orderbook', 'analysis', 'strategy' ])] recent_lines = analysis_lines[-lines:] return { "logs": recent_lines, "count": len(recent_lines), "total_analysis_logs": len(analysis_lines) } else: return {"error": "Log file not found", "logs": [], "count": 0} except Exception as e: raise HTTPException(status_code=500, detail=f"Error reading logs: {e}") @app.get("/logs/live") async def get_live_logs(lines: int = 20): """Get recent live logs (streaming)""" try: log_file = "logs/scalper.log" if os.path.exists(log_file): with open(log_file, 'r') as f: all_lines = f.readlines() recent_lines = all_lines[-lines:] return { "logs": recent_lines, "count": len(recent_lines), "latest_timestamp": recent_lines[-1].split(' - ')[0] if recent_lines else None } else: return {"error": "Log file not found", "logs": [], "count": 0} except Exception as e: raise HTTPException(status_code=500, detail=f"Error reading logs: {e}") @app.post("/start_all") async def start_all_pairs(background_tasks: BackgroundTasks, duration_hours: int = 18, max_trades: Optional[int] = None): """Start trading sessions for all configured pairs""" settings, configured_pairs = load_config() started_sessions = [] failed_sessions = [] for symbol in configured_pairs: # Check if already running already_running = any(session['symbol'] == symbol and session['status'] == 'running' for session in active_sessions.values()) if already_running: failed_sessions.append({"symbol": symbol, "reason": "Already running"}) continue try: # Start trading session background_tasks.add_task(run_trading_session, symbol, duration_hours, max_trades) started_sessions.append({"symbol": symbol, "duration_hours": duration_hours}) except Exception as e: failed_sessions.append({"symbol": symbol, "reason": str(e)}) message = f"šŸš€ Started trading sessions for {len(started_sessions)} pairs" if started_sessions: send_telegram(message) send_hf_notification(message, "bulk_start") log(message) return { "message": message, "started_sessions": started_sessions, "failed_sessions": failed_sessions, "total_started": len(started_sessions), "total_failed": len(failed_sessions) } @app.post("/stop_all") async def stop_all_pairs(): """Stop trading sessions for all pairs""" stopped_sessions = [] for session_id, session in list(active_sessions.items()): if session['status'] == 'running': session['status'] = 'stopped' session['end_time'] = datetime.now() stopped_sessions.append(session_id) # Stop the actual trading if session['symbol'] in bot_instances: bot = bot_instances[session['symbol']] bot['monitor'].stop_monitoring() await bot['ws_client'].close() if stopped_sessions: message = f"šŸ›‘ Stopped all trading sessions ({len(stopped_sessions)} sessions)" send_telegram(message) send_hf_notification(message, "bulk_stop") log(message) return {"message": message, "stopped_sessions": stopped_sessions} else: return {"message": "No active sessions to stop", "stopped_sessions": []} @app.get("/analysis/status") async def get_analysis_status(): """Get current analysis status for all active sessions""" analysis_status = {} for session_id, session in active_sessions.items(): if session['status'] == 'running' and session['symbol'] in bot_instances: bot = bot_instances[session['symbol']] symbol = session['symbol'] # Get current market data try: current_price = bot['data_engine'].get_prices(symbol, limit=1) current_price = current_price[-1] if current_price else None except: current_price = None # Get technical indicators try: ema_fast = bot['data_engine'].calculate_ema(symbol, "1", 9) ema_slow = bot['data_engine'].calculate_ema(symbol, "1", 21) rsi = bot['data_engine'].calculate_rsi(symbol, "1", 14) volume_spike = bot['data_engine'].detect_volume_spike(symbol, "1", 1.3) orderbook_imbalance = bot['data_engine'].get_orderbook_imbalance(symbol) except: ema_fast = ema_slow = rsi = volume_spike = orderbook_imbalance = None analysis_status[symbol] = { "session_id": session_id, "current_price": current_price, "indicators": { "ema_9": ema_fast, "ema_21": ema_slow, "rsi_14": rsi, "volume_spike": volume_spike, "orderbook_imbalance": orderbook_imbalance }, "strategy_conditions": { "trend_up": ema_fast > ema_slow if ema_fast and ema_slow else None, "rsi_valid": 40 <= rsi <= 70 if rsi else None, "volume_confirmed": volume_spike, "orderbook_aligned": abs(orderbook_imbalance or 0) > 0.15 }, "last_update": datetime.now().isoformat() } return { "analysis_status": analysis_status, "active_sessions": len(analysis_status), "last_updated": datetime.now().isoformat() } @app.get("/notifications") async def get_notifications(limit: int = 20): """Get recent notifications (works in HF Spaces)""" try: notifications = get_hf_notifications(limit) return { "notifications": notifications, "count": len(notifications), "limit_requested": limit } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get notifications: {e}") @app.post("/emergency_stop") async def emergency_stop(): """Emergency stop all trading""" stopped_sessions = [] for session_id, session in active_sessions.items(): if session['status'] == 'running': session['status'] = 'emergency_stop' session['end_time'] = datetime.now() stopped_sessions.append(session_id) # Stop all monitors for bot in bot_instances.values(): bot['monitor'].stop_monitoring() await bot['ws_client'].close() message = f"🚨 EMERGENCY STOP activated - All trading halted" send_telegram(message) send_hf_notification(message, "emergency") log(message) return {"message": message, "stopped_sessions": stopped_sessions} if __name__ == "__main__": log("šŸš€ Starting FastAPI server for Bybit Scalping Bot") uvicorn.run(app, host="0.0.0.0", port=8000)