Spaces:
Paused
Paused
| from fastapi import FastAPI, BackgroundTasks, HTTPException | |
| from pydantic import BaseModel | |
| import uvicorn | |
| import asyncio | |
| import threading | |
| import time | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional | |
| import yaml | |
| import os | |
| from dotenv import load_dotenv | |
| from core.exchange import BybitExchange | |
| from core.strategy import ScalpingStrategy | |
| from core.risk import RiskManager | |
| from core.data_engine import DataEngine | |
| from core.trade_monitor import TradeMonitor | |
| from core.websockets import BybitWebSocket | |
| from services.logger import log, log_trade, log_performance | |
| from services.telegram import send_telegram | |
| from services.notifications import send_hf_notification, get_hf_notifications | |
| load_dotenv() | |
| app = FastAPI(title="Bybit Scalping Bot API", version="1.0.0") | |
| class TradeSession(BaseModel): | |
| symbol: str | |
| duration_hours: int = 18 | |
| max_trades: Optional[int] = None | |
| class BotStatus(BaseModel): | |
| is_running: bool | |
| active_sessions: List[Dict] | |
| total_pnl: float | |
| trades_today: int | |
| uptime: str | |
| # Global bot state | |
| bot_instances = {} | |
| active_sessions = {} | |
| session_reports = {} | |
| def load_config(): | |
| settings = yaml.safe_load(open("config/settings.yaml")) | |
| pairs = yaml.safe_load(open("config/pairs.yaml"))["pairs"] | |
| return settings, pairs | |
| def create_bot_instance(symbol: str): | |
| """Create a dedicated bot instance for a symbol""" | |
| settings, pairs = load_config() | |
| exchange = BybitExchange() | |
| data_engine = DataEngine() | |
| strategy = ScalpingStrategy(data_engine) | |
| risk_manager = RiskManager(exchange) | |
| monitor = TradeMonitor(exchange, strategy, risk_manager, data_engine) | |
| ws_client = BybitWebSocket(callback=lambda data: handle_ws_data(data, symbol)) | |
| return { | |
| 'exchange': exchange, | |
| 'data_engine': data_engine, | |
| 'strategy': strategy, | |
| 'risk_manager': risk_manager, | |
| 'monitor': monitor, | |
| 'ws_client': ws_client, | |
| 'symbol': symbol, | |
| 'start_time': datetime.now(), | |
| 'trades': [], | |
| 'pnl': 0.0 | |
| } | |
| def handle_ws_data(data, symbol): | |
| """Handle WebSocket data for specific symbol""" | |
| if symbol in bot_instances: | |
| bot = bot_instances[symbol] | |
| if "topic" in data: | |
| topic = data["topic"] | |
| payload = data["data"] | |
| if topic.startswith("tickers."): | |
| ticker_symbol = topic.split(".")[1] | |
| if ticker_symbol == symbol: | |
| bot['data_engine'].update_price(symbol, float(payload["lastPrice"])) | |
| elif topic.startswith("kline."): | |
| parts = topic.split(".") | |
| interval = parts[1] | |
| ticker_symbol = parts[2] | |
| if ticker_symbol == symbol: | |
| for candle in payload: | |
| candle_data = { | |
| 'timestamp': candle['start'], | |
| 'open': float(candle['open']), | |
| 'high': float(candle['high']), | |
| 'low': float(candle['low']), | |
| 'close': float(candle['close']), | |
| 'volume': float(candle['volume']) | |
| } | |
| bot['data_engine'].update_candle(symbol, interval, candle_data) | |
| async def run_trading_session(symbol: str, duration_hours: int, max_trades: Optional[int] = None): | |
| """Run a trading session for a specific symbol""" | |
| session_id = f"{symbol}_{int(time.time())}" | |
| log(f"π Starting trading session for {symbol} (Duration: {duration_hours}h)") | |
| if symbol not in bot_instances: | |
| bot_instances[symbol] = create_bot_instance(symbol) | |
| bot = bot_instances[symbol] | |
| active_sessions[session_id] = { | |
| 'symbol': symbol, | |
| 'start_time': datetime.now(), | |
| 'end_time': datetime.now() + timedelta(hours=duration_hours), | |
| 'duration_hours': duration_hours, | |
| 'max_trades': max_trades, | |
| 'trades_executed': 0, | |
| 'pnl': 0.0, | |
| 'status': 'running' | |
| } | |
| try: | |
| # Start WebSocket | |
| await bot['ws_client'].connect() | |
| await asyncio.sleep(2) | |
| await bot['ws_client'].subscribe_ticker([symbol]) | |
| await bot['ws_client'].subscribe_kline([symbol], ["1", "5"]) | |
| await bot['ws_client'].subscribe_orderbook([symbol], depth=25) | |
| await bot['ws_client'].subscribe_trades([symbol]) | |
| # Start monitoring | |
| monitor_task = asyncio.create_task(bot['monitor'].start_monitoring()) | |
| # Run for specified duration | |
| end_time = datetime.now() + timedelta(hours=duration_hours) | |
| trades_count = 0 | |
| while datetime.now() < end_time: | |
| await asyncio.sleep(1) | |
| # Check trade limits | |
| if max_trades and trades_count >= max_trades: | |
| log(f"π― Max trades reached for {symbol} session") | |
| break | |
| # Update session stats | |
| session = active_sessions[session_id] | |
| session['pnl'] = bot['risk_manager'].daily_pnl | |
| session['trades_executed'] = len(bot['risk_manager'].trade_history) | |
| # Stop monitoring | |
| bot['monitor'].stop_monitoring() | |
| await bot['ws_client'].close() | |
| # Generate report | |
| await generate_session_report(session_id) | |
| except Exception as e: | |
| log(f"β Error in trading session for {symbol}: {e}") | |
| active_sessions[session_id]['status'] = 'error' | |
| async def generate_session_report(session_id: str): | |
| """Generate and send trading session report""" | |
| if session_id not in active_sessions: | |
| return | |
| session = active_sessions[session_id] | |
| symbol = session['symbol'] | |
| if symbol in bot_instances: | |
| bot = bot_instances[symbol] | |
| risk_manager = bot['risk_manager'] | |
| # Calculate session metrics | |
| total_trades = len(risk_manager.trade_history) | |
| winning_trades = sum(1 for trade in risk_manager.trade_history if trade['pnl'] > 0) | |
| losing_trades = total_trades - winning_trades | |
| win_rate = winning_trades / total_trades if total_trades > 0 else 0 | |
| total_pnl = sum(trade['pnl'] for trade in risk_manager.trade_history) | |
| # Generate report | |
| report = f""" | |
| π TRADING SESSION REPORT - {symbol} | |
| βββββββββββββββββββββββββββββββββββββββββββ | |
| π Session Details: | |
| β’ Duration: {session['duration_hours']} hours | |
| β’ Start Time: {session['start_time'].strftime('%Y-%m-%d %H:%M:%S')} | |
| β’ End Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| π Performance Metrics: | |
| β’ Total Trades: {total_trades} | |
| β’ Winning Trades: {winning_trades} | |
| β’ Losing Trades: {losing_trades} | |
| β’ Win Rate: {win_rate:.1%} | |
| β’ Total P&L: ${total_pnl:.2f} | |
| π― Risk Management: | |
| β’ Max Loss Limit: ${risk_manager.max_daily_loss} | |
| β’ Risk per Trade: {risk_manager.risk_per_trade*100}% | |
| β’ Leverage Used: {risk_manager.leverage}x | |
| π Trade Summary: | |
| """ | |
| # Add individual trade details | |
| for i, trade in enumerate(risk_manager.trade_history[-10:], 1): # Last 10 trades | |
| pnl_emoji = "π’" if trade['pnl'] > 0 else "π΄" | |
| report += f"{i}. {trade['side']} @ ${trade['entry_price']:.2f} β ${trade['exit_price']:.2f} {pnl_emoji} P&L: ${trade['pnl']:.2f}\n" | |
| report += f"\nβββββββββββββββββββββββββββββββββββββββββββ" | |
| # Send to Telegram | |
| send_telegram(report) | |
| # Save report | |
| session_reports[session_id] = { | |
| 'session': session, | |
| 'metrics': { | |
| 'total_trades': total_trades, | |
| 'win_rate': win_rate, | |
| 'total_pnl': total_pnl, | |
| 'winning_trades': winning_trades, | |
| 'losing_trades': losing_trades | |
| }, | |
| 'trades': risk_manager.trade_history, | |
| 'report': report | |
| } | |
| log(f"π Session report generated for {symbol}") | |
| # Mark session as completed | |
| session['status'] = 'completed' | |
| session['end_time'] = datetime.now() | |
| async def root(): | |
| """API root endpoint""" | |
| return {"message": "Bybit Scalping Bot API", "version": "1.0.0", "status": "running"} | |
| async def get_status(): | |
| """Get bot status""" | |
| total_pnl = 0 | |
| total_trades = 0 | |
| for session in active_sessions.values(): | |
| if session['status'] == 'running': | |
| total_pnl += session.get('pnl', 0) | |
| total_trades += session.get('trades_executed', 0) | |
| return BotStatus( | |
| is_running=bool(active_sessions), | |
| active_sessions=[ | |
| { | |
| 'session_id': sid, | |
| 'symbol': session['symbol'], | |
| 'status': session['status'], | |
| 'start_time': session['start_time'].isoformat(), | |
| 'pnl': session.get('pnl', 0), | |
| 'trades': session.get('trades_executed', 0) | |
| } | |
| for sid, session in active_sessions.items() | |
| ], | |
| total_pnl=total_pnl, | |
| trades_today=total_trades, | |
| uptime=str(datetime.now() - datetime.fromtimestamp(time.time())) | |
| ) | |
| async def start_symbol_trading(symbol: str, background_tasks: BackgroundTasks, duration_hours: int = 18, max_trades: Optional[int] = None): | |
| """Start trading for a specific symbol""" | |
| settings, configured_pairs = load_config() | |
| if symbol not in configured_pairs: | |
| raise HTTPException(status_code=400, detail=f"Symbol {symbol} not configured. Available: {configured_pairs}") | |
| # Check if already running | |
| for session in active_sessions.values(): | |
| if session['symbol'] == symbol and session['status'] == 'running': | |
| raise HTTPException(status_code=400, detail=f"Trading already active for {symbol}") | |
| # Start trading session | |
| background_tasks.add_task(run_trading_session, symbol, duration_hours, max_trades) | |
| message = f"π Started trading session for {symbol} ({duration_hours}h duration)" | |
| send_telegram(message) | |
| send_hf_notification(message, "session_start") | |
| log(message) | |
| return {"message": message, "symbol": symbol, "duration_hours": duration_hours} | |
| async def stop_symbol_trading(symbol: str): | |
| """Stop trading for a specific symbol""" | |
| stopped_sessions = [] | |
| for session_id, session in active_sessions.items(): | |
| if session['symbol'] == symbol and session['status'] == 'running': | |
| session['status'] = 'stopped' | |
| session['end_time'] = datetime.now() | |
| stopped_sessions.append(session_id) | |
| if symbol in bot_instances: | |
| bot = bot_instances[symbol] | |
| bot['monitor'].stop_monitoring() | |
| await bot['ws_client'].close() | |
| if stopped_sessions: | |
| message = f"π Stopped trading for {symbol}" | |
| send_telegram(message) | |
| send_hf_notification(message, "session_stop") | |
| log(message) | |
| return {"message": message, "stopped_sessions": stopped_sessions} | |
| else: | |
| raise HTTPException(status_code=404, detail=f"No active trading session found for {symbol}") | |
| async def get_sessions(): | |
| """Get all trading sessions""" | |
| return { | |
| "active_sessions": active_sessions, | |
| "completed_sessions": session_reports | |
| } | |
| async def get_session_report(session_id: str): | |
| """Get detailed report for a specific session""" | |
| if session_id in session_reports: | |
| return session_reports[session_id] | |
| else: | |
| raise HTTPException(status_code=404, detail="Session report not found") | |
| async def get_analysis_logs(lines: int = 50): | |
| """Get recent analysis logs""" | |
| try: | |
| log_file = "logs/scalper.log" | |
| if os.path.exists(log_file): | |
| with open(log_file, 'r') as f: | |
| all_lines = f.readlines() | |
| # Filter for analysis-related logs | |
| analysis_lines = [line for line in all_lines if any(keyword in line for keyword in [ | |
| 'SIGNAL', 'EMA', 'RSI', 'volume', 'orderbook', 'analysis', 'strategy' | |
| ])] | |
| recent_lines = analysis_lines[-lines:] | |
| return { | |
| "logs": recent_lines, | |
| "count": len(recent_lines), | |
| "total_analysis_logs": len(analysis_lines) | |
| } | |
| else: | |
| return {"error": "Log file not found", "logs": [], "count": 0} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error reading logs: {e}") | |
| async def get_live_logs(lines: int = 20): | |
| """Get recent live logs (streaming)""" | |
| try: | |
| log_file = "logs/scalper.log" | |
| if os.path.exists(log_file): | |
| with open(log_file, 'r') as f: | |
| all_lines = f.readlines() | |
| recent_lines = all_lines[-lines:] | |
| return { | |
| "logs": recent_lines, | |
| "count": len(recent_lines), | |
| "latest_timestamp": recent_lines[-1].split(' - ')[0] if recent_lines else None | |
| } | |
| else: | |
| return {"error": "Log file not found", "logs": [], "count": 0} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error reading logs: {e}") | |
| async def start_all_pairs(background_tasks: BackgroundTasks, duration_hours: int = 18, max_trades: Optional[int] = None): | |
| """Start trading sessions for all configured pairs""" | |
| settings, configured_pairs = load_config() | |
| started_sessions = [] | |
| failed_sessions = [] | |
| for symbol in configured_pairs: | |
| # Check if already running | |
| already_running = any(session['symbol'] == symbol and session['status'] == 'running' | |
| for session in active_sessions.values()) | |
| if already_running: | |
| failed_sessions.append({"symbol": symbol, "reason": "Already running"}) | |
| continue | |
| try: | |
| # Start trading session | |
| background_tasks.add_task(run_trading_session, symbol, duration_hours, max_trades) | |
| started_sessions.append({"symbol": symbol, "duration_hours": duration_hours}) | |
| except Exception as e: | |
| failed_sessions.append({"symbol": symbol, "reason": str(e)}) | |
| message = f"π Started trading sessions for {len(started_sessions)} pairs" | |
| if started_sessions: | |
| send_telegram(message) | |
| send_hf_notification(message, "bulk_start") | |
| log(message) | |
| return { | |
| "message": message, | |
| "started_sessions": started_sessions, | |
| "failed_sessions": failed_sessions, | |
| "total_started": len(started_sessions), | |
| "total_failed": len(failed_sessions) | |
| } | |
| async def stop_all_pairs(): | |
| """Stop trading sessions for all pairs""" | |
| stopped_sessions = [] | |
| for session_id, session in list(active_sessions.items()): | |
| if session['status'] == 'running': | |
| session['status'] = 'stopped' | |
| session['end_time'] = datetime.now() | |
| stopped_sessions.append(session_id) | |
| # Stop the actual trading | |
| if session['symbol'] in bot_instances: | |
| bot = bot_instances[session['symbol']] | |
| bot['monitor'].stop_monitoring() | |
| await bot['ws_client'].close() | |
| if stopped_sessions: | |
| message = f"π Stopped all trading sessions ({len(stopped_sessions)} sessions)" | |
| send_telegram(message) | |
| send_hf_notification(message, "bulk_stop") | |
| log(message) | |
| return {"message": message, "stopped_sessions": stopped_sessions} | |
| else: | |
| return {"message": "No active sessions to stop", "stopped_sessions": []} | |
| async def get_analysis_status(): | |
| """Get current analysis status for all active sessions""" | |
| analysis_status = {} | |
| for session_id, session in active_sessions.items(): | |
| if session['status'] == 'running' and session['symbol'] in bot_instances: | |
| bot = bot_instances[session['symbol']] | |
| symbol = session['symbol'] | |
| # Get current market data | |
| try: | |
| current_price = bot['data_engine'].get_prices(symbol, limit=1) | |
| current_price = current_price[-1] if current_price else None | |
| except: | |
| current_price = None | |
| # Get technical indicators | |
| try: | |
| ema_fast = bot['data_engine'].calculate_ema(symbol, "1", 9) | |
| ema_slow = bot['data_engine'].calculate_ema(symbol, "1", 21) | |
| rsi = bot['data_engine'].calculate_rsi(symbol, "1", 14) | |
| volume_spike = bot['data_engine'].detect_volume_spike(symbol, "1", 1.3) | |
| orderbook_imbalance = bot['data_engine'].get_orderbook_imbalance(symbol) | |
| except: | |
| ema_fast = ema_slow = rsi = volume_spike = orderbook_imbalance = None | |
| analysis_status[symbol] = { | |
| "session_id": session_id, | |
| "current_price": current_price, | |
| "indicators": { | |
| "ema_9": ema_fast, | |
| "ema_21": ema_slow, | |
| "rsi_14": rsi, | |
| "volume_spike": volume_spike, | |
| "orderbook_imbalance": orderbook_imbalance | |
| }, | |
| "strategy_conditions": { | |
| "trend_up": ema_fast > ema_slow if ema_fast and ema_slow else None, | |
| "rsi_valid": 40 <= rsi <= 70 if rsi else None, | |
| "volume_confirmed": volume_spike, | |
| "orderbook_aligned": abs(orderbook_imbalance or 0) > 0.15 | |
| }, | |
| "last_update": datetime.now().isoformat() | |
| } | |
| return { | |
| "analysis_status": analysis_status, | |
| "active_sessions": len(analysis_status), | |
| "last_updated": datetime.now().isoformat() | |
| } | |
| async def get_notifications(limit: int = 20): | |
| """Get recent notifications (works in HF Spaces)""" | |
| try: | |
| notifications = get_hf_notifications(limit) | |
| return { | |
| "notifications": notifications, | |
| "count": len(notifications), | |
| "limit_requested": limit | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to get notifications: {e}") | |
| async def emergency_stop(): | |
| """Emergency stop all trading""" | |
| stopped_sessions = [] | |
| for session_id, session in active_sessions.items(): | |
| if session['status'] == 'running': | |
| session['status'] = 'emergency_stop' | |
| session['end_time'] = datetime.now() | |
| stopped_sessions.append(session_id) | |
| # Stop all monitors | |
| for bot in bot_instances.values(): | |
| bot['monitor'].stop_monitoring() | |
| await bot['ws_client'].close() | |
| message = f"π¨ EMERGENCY STOP activated - All trading halted" | |
| send_telegram(message) | |
| send_hf_notification(message, "emergency") | |
| log(message) | |
| return {"message": message, "stopped_sessions": stopped_sessions} | |
| if __name__ == "__main__": | |
| log("π Starting FastAPI server for Bybit Scalping Bot") | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |