Spaces:
Runtime error
Runtime error
| # Main Python Processor Script (Master Script) | |
| # Integrates Data Manager, Three Strategies, Signal Aggregation, and Custom Terminal UI with RICH | |
| import time | |
| import json | |
| import threading | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, List, Optional | |
| # --- Import RICH Components --- | |
| from rich.console import Console | |
| from rich.layout import Layout | |
| from rich.panel import Panel | |
| from rich.table import Table | |
| from rich.text import Text | |
| from rich.live import Live | |
| from rich.style import Style | |
| # --- Import Core Components --- | |
| from data_manager import BinanceDataManager | |
| from strategy_1_bb_reentry import analyze_strategy_1 | |
| from strategy_2_doji_ichimoku import analyze_strategy_2, CandleData | |
| from strategy_3_trend_range_volume import analyze_strategy_3 | |
| # --- Configuration --- | |
| CONFIG = { | |
| 'MARKET_SCAN_DELAY_MS': 1000, # Delay between analyzing each market | |
| 'SIGNAL_AGGREGATION_WINDOW_MIN': 7, # Time window for official signal aggregation | |
| 'TELEGRAM_CONFIG_FILE': 'telegram_config.json', | |
| 'MAX_LOG_LINES': 50, | |
| 'MAX_ACTIVE_SIGNALS': 10, | |
| 'MAX_MARKETS': 500 # Max symbols to fetch | |
| } | |
| # --- RICH Styles and Colors (Based on User Request) --- | |
| class Styles: | |
| # Strategy Colors (User requested: Light Blue, Orange, Pistachio Green) | |
| STRAT_1_COLOR = "cyan" | |
| STRAT_2_COLOR = "orange3" | |
| STRAT_3_COLOR = "green3" | |
| # Dynamic Colors for Border (User requested 150 colors, alternating) | |
| BORDER_COLORS = ["red", "green", "blue", "yellow", "magenta", "cyan", "white", "bright_red", "bright_green", "bright_blue", "bright_yellow", "bright_magenta", "bright_cyan", "bright_white"] | |
| # We will cycle through these 14 colors for visual effect. 150 is too many for rich's standard palette. | |
| BORDER_COLOR_INDEX = 0 | |
| # Log Colors (User requested: Pink/Fuchsia, Canary Yellow, Purple/Violet) | |
| LOG_SIGNAL = Style(color="magenta", bold=True) # Pink/Fuchsia for successful signal | |
| LOG_NETWORK = Style(color="yellow", bold=False) # Canary Yellow for network/connections | |
| LOG_CONFIG = Style(color="purple", bold=False) # Purple/Violet for config/personal | |
| LOG_INFO = Style(color="blue", bold=False) | |
| LOG_WARNING = Style(color="yellow", bold=True) | |
| LOG_ERROR = Style(color="red", bold=True) | |
| LOG_CRITICAL = Style(color="white", bgcolor="red", bold=True) | |
| # Signal Colors | |
| SIGNAL_LONG = Style(color="green", bold=True) | |
| SIGNAL_SHORT = Style(color="red", bold=True) | |
| HEADER = Style(color="white", bgcolor="dark_blue", bold=True) | |
| # Light Colors | |
| LIGHT_ON = "bold green" | |
| LIGHT_OFF = "dim white" | |
| # --- State Management --- | |
| class State: | |
| def __init__(self): | |
| self.data_manager = BinanceDataManager() | |
| self.logs: List[Dict[str, Any]] = [] | |
| self.active_signals: Dict[str, Dict[str, Any]] = {} # {symbol: {strategy_id: signal_data}} | |
| self.official_signals: List[Dict[str, Any]] = [] | |
| self.stats = {'total_cycles': 0, 'total_signals': 0} | |
| self.is_running = True | |
| self.telegram_config = self._load_telegram_config() | |
| self.console = Console() | |
| self.last_strat_log: Dict[int, str] = {1: "Waiting...", 2: "Waiting...", 3: "Waiting..."} | |
| self.last_analyzed_symbol: str = "N/A" | |
| self.strategy_names = { | |
| 1: "BB Re-entry", | |
| 2: "Doji Box/Ichimoku", | |
| 3: "Trend/Volume Scalper" | |
| } | |
| def _load_telegram_config(self): | |
| try: | |
| with open(CONFIG['TELEGRAM_CONFIG_FILE'], 'r') as f: | |
| return json.load(f) | |
| except FileNotFoundError: | |
| self.add_log('WARNING', 'Telegram config file not found. Telegram notifications disabled.', Styles.LOG_CONFIG) | |
| return None | |
| except json.JSONDecodeError: | |
| self.add_log('ERROR', 'Invalid Telegram config file format.', Styles.LOG_CONFIG) | |
| return None | |
| def add_log(self, type: str, message: str, style: Style = Styles.LOG_INFO): | |
| timestamp = datetime.now().strftime('%H:%M:%S') | |
| self.logs.append({'time': timestamp, 'type': type, 'message': message, 'style': style}) | |
| if len(self.logs) > CONFIG['MAX_LOG_LINES']: | |
| self.logs.pop(0) | |
| # Update last strategy log for UI | |
| if type.startswith('STRATEGY'): | |
| strat_id = int(type.split(' ')[1]) | |
| self.last_strat_log[strat_id] = message | |
| def update_active_signals(self, symbol: str, strategy_id: int, signal_data: Dict[str, Any]): | |
| if symbol not in self.active_signals: | |
| self.active_signals[symbol] = {} | |
| self.active_signals[symbol][strategy_id] = { | |
| 'signal': signal_data['signal'], | |
| 'time': datetime.now(), | |
| 'entry_price': signal_data.get('curr_close') or signal_data.get('entry_price') | |
| } | |
| self.stats['total_signals'] += 1 | |
| def check_official_signal(self, symbol: str): | |
| """Checks for aggregation: same direction in all 3 strategies within the time window.""" | |
| if symbol not in self.active_signals: | |
| return None | |
| signals = self.active_signals[symbol] | |
| if len(signals) < 3: | |
| return None | |
| # Get the most recent signal time | |
| latest_time = max(s['time'] for s in signals.values()) | |
| # Check if all signals are within the time window | |
| time_window = timedelta(minutes=CONFIG['SIGNAL_AGGREGATION_WINDOW_MIN']) | |
| is_aggregated = all(latest_time - s['time'] <= time_window for s in signals.values()) | |
| if not is_aggregated: | |
| return None | |
| # Check if all signals have the same direction (BUY/LONG or SELL/SHORT) | |
| directions = [s['signal'].replace('LONG', 'BUY').replace('SHORT', 'SELL') for s in signals.values()] | |
| if len(set(directions)) == 1 and directions[0] in ['BUY', 'SELL']: | |
| # Check if this signal has already been issued | |
| if any(s['symbol'] == symbol and s['direction'] == directions[0] and s['status'] == 'ACTIVE' for s in self.official_signals): | |
| return None | |
| # Official Signal Found | |
| official_signal = { | |
| 'symbol': symbol, | |
| 'direction': directions[0], | |
| 'entry_price': signals[1]['entry_price'], # Use entry from Strategy 1 as base | |
| 'time': latest_time, | |
| 'status': 'ACTIVE', | |
| 'pnl_percent': 0.0, | |
| 'pnl_start_time': datetime.now() | |
| } | |
| self.official_signals.append(official_signal) | |
| self.add_log('OFFICIAL SIGNAL', f"Aggregated {directions[0]} signal for {symbol}!", Styles.LOG_SIGNAL) | |
| self._send_telegram_alert(official_signal) | |
| return official_signal | |
| return None | |
| def _send_telegram_alert(self, signal: Dict[str, Any]): | |
| """Placeholder for Telegram notification logic.""" | |
| if not self.telegram_config: | |
| return | |
| # User requested format: 3 circles + Direction + Symbol + Entry Price | |
| direction = signal['direction'] | |
| symbol = signal['symbol'] | |
| entry = signal['entry_price'] | |
| circle = '🟢' if direction == 'BUY' else '🔴' | |
| message = f"{circle * 3} {direction} {symbol} @ {entry:.4f} {circle * 3}" | |
| # In a real scenario, we would use a library like python-telegram-bot here | |
| # to send the message using self.telegram_config['token'] and self.telegram_config['chat_id']. | |
| self.add_log('NETWORK', f"Telegram alert sent: {message}", Styles.LOG_NETWORK) | |
| def update_pnl(self, symbol: str, current_price: float): | |
| """Updates PnL for active official signals.""" | |
| for signal in self.official_signals: | |
| if signal['symbol'] == symbol and signal['status'] == 'ACTIVE': | |
| entry = signal['entry_price'] | |
| if signal['direction'] == 'BUY': | |
| pnl = ((current_price - entry) / entry) * 100 | |
| else: # SELL | |
| pnl = ((entry - current_price) / entry) * 100 | |
| signal['pnl_percent'] = pnl | |
| # Check for 5% profit exit (User's request) | |
| if pnl >= 5.0: | |
| signal['status'] = 'CLOSED_TP' | |
| self.add_log('SUCCESS', f"TP HIT: {symbol} closed at +{pnl:.2f}%", Styles.LOG_SIGNAL) | |
| # Check for -5% loss exit (Implied SL for simplicity, as user didn't provide SL) | |
| elif pnl <= -5.0: | |
| signal['status'] = 'CLOSED_SL' | |
| self.add_log('ERROR', f"SL HIT: {symbol} closed at {pnl:.2f}%", Styles.LOG_ERROR) | |
| # --- RICH UI Rendering --- | |
| def make_header(state: State) -> Panel: | |
| """Creates the main header panel with center-aligned, colorful title.""" | |
| # Cycle border color | |
| current_color = Styles.BORDER_COLORS[Styles.BORDER_COLOR_INDEX % len(Styles.BORDER_COLORS)] | |
| Styles.BORDER_COLOR_INDEX = (Styles.BORDER_COLOR_INDEX + 1) % len(Styles.BORDER_COLORS) | |
| title_text = Text("TERMINAL TAHLILGAR CRYPTO", justify="center") | |
| info_text = Text.assemble( | |
| (f"Cycle: {state.data_manager.cycle_count} | ", Style(color="white")), | |
| (f"Market: {state.data_manager.market_index}/{len(state.data_manager.symbols)} | ", Style(color="white")), | |
| (f"Analyzing: {state.last_analyzed_symbol} | ", Style(color="yellow")), | |
| (f"Active Signals: {len([s for s in state.official_signals if s['status'] == 'ACTIVE'])}", Style(color="green")) | |
| ) | |
| return Panel( | |
| title_text, | |
| title=f"[bold {current_color}]MASTER PROCESSOR[/bold {current_color}]", | |
| border_style=current_color, | |
| subtitle=info_text | |
| ) | |
| def make_strategy_panel(state: State, strat_id: int, color: str) -> Panel: | |
| """Creates a single strategy status panel.""" | |
| title = state.strategy_names[strat_id] | |
| # Cycle border color | |
| current_color = Styles.BORDER_COLORS[Styles.BORDER_COLOR_INDEX % len(Styles.BORDER_COLORS)] | |
| Styles.BORDER_COLOR_INDEX = (Styles.BORDER_COLOR_INDEX + 1) % len(Styles.BORDER_COLORS) | |
| # Check if the last analyzed symbol has a signal from this strategy | |
| has_signal = strat_id in state.active_signals.get(state.last_analyzed_symbol, {}) | |
| # Light status | |
| light = Text("●", style=Styles.LIGHT_ON) if has_signal else Text("○", style=Styles.LIGHT_OFF) | |
| # Content | |
| content = Text.assemble( | |
| (f"{light} ", Style(color=color)), | |
| (f"Last Log: {state.last_strat_log[strat_id]}", Style(color="white")) | |
| ) | |
| return Panel( | |
| content, | |
| title=f"[bold {color}]{title}[/bold {color}]", | |
| border_style=color, | |
| height=3 | |
| ) | |
| def make_signal_table(state: State) -> Panel: | |
| """Creates the large official signal table panel.""" | |
| table = Table( | |
| title="[bold white]OFFICIAL AGGREGATED SIGNALS (MAIN OUTPUT)[/bold white]", | |
| show_header=True, | |
| header_style=Styles.HEADER, | |
| border_style="white", | |
| title_style=Styles.HEADER | |
| ) | |
| table.add_column("Indicator", style="white", justify="center") | |
| table.add_column("Time", style="white", justify="center") | |
| table.add_column("Symbol", style="cyan", justify="left") | |
| table.add_column("Entry Price", style="white", justify="right") | |
| table.add_column("PnL%", style="white", justify="right") | |
| active_signals = [s for s in state.official_signals if s['status'] == 'ACTIVE'] | |
| if not active_signals: | |
| table.add_row(Text("No active official signals.", style="dim yellow"), "", "", "", "") | |
| else: | |
| for signal in active_signals: | |
| direction = signal['direction'] | |
| pnl = signal['pnl_percent'] | |
| # Direction Indicator (Circle + Letter) | |
| dir_style = Styles.SIGNAL_LONG if direction == 'BUY' else Styles.SIGNAL_SHORT | |
| dir_circle = Text("●", style=dir_style) | |
| dir_letter = Text("L" if direction == 'BUY' else "S", style=dir_style) | |
| indicator = Text.assemble(dir_circle, " ", dir_letter) | |
| # Time Elapsed | |
| elapsed = datetime.now() - signal['pnl_start_time'] | |
| minutes = int(elapsed.total_seconds() // 60) | |
| seconds = int(elapsed.total_seconds() % 60) | |
| time_str = f"{minutes:02d}m {seconds:02d}s" | |
| # PnL | |
| pnl_style = Styles.SIGNAL_LONG if pnl >= 0 else Styles.SIGNAL_SHORT | |
| pnl_text = Text(f"{pnl:+.2f}%", style=pnl_style) | |
| table.add_row( | |
| indicator, | |
| time_str, | |
| signal['symbol'], | |
| f"{signal['entry_price']:.4f}", | |
| pnl_text | |
| ) | |
| return Panel(table, title="[bold white]Official Signals[/bold white]", border_style="white") | |
| def make_logs_panel(state: State) -> Panel: | |
| """Creates the system logs panel.""" | |
| log_text = Text() | |
| for log in state.logs: | |
| log_text.append(f"[{log['time']}] {log['type']}: {log['message']}\n", style=log['style']) | |
| # Cycle border color | |
| current_color = Styles.BORDER_COLORS[Styles.BORDER_COLOR_INDEX % len(Styles.BORDER_COLORS)] | |
| Styles.BORDER_COLOR_INDEX = (Styles.BORDER_COLOR_INDEX + 1) % len(Styles.BORDER_COLORS) | |
| return Panel( | |
| log_text, | |
| title="[bold white]LOGS[/bold white]", | |
| border_style=current_color, | |
| height=15, | |
| # Autoscroll is handled by always appending to the Text object and rich's rendering | |
| ) | |
| def render_ui(state: State) -> Layout: | |
| """Renders the complete UI layout.""" | |
| layout = Layout(name="root") | |
| layout.split( | |
| Layout(name="header", size=5), | |
| Layout(name="strategy_status", ratio=1), | |
| Layout(name="signal_table", ratio=2), | |
| Layout(name="logs", ratio=3) | |
| ) | |
| layout["header"].update(make_header(state)) | |
| layout["strategy_status"].split_row( | |
| make_strategy_panel(state, 1, Styles.STRAT_1_COLOR), | |
| make_strategy_panel(state, 2, Styles.STRAT_2_COLOR), | |
| make_strategy_panel(state, 3, Styles.STRAT_3_COLOR) | |
| ) | |
| layout["signal_table"].update(make_signal_table(state)) | |
| layout["logs"].update(make_logs_panel(state)) | |
| return layout | |
| # --- Main Execution Loop --- | |
| def main_loop(state: State, live: Live): | |
| # 1. Initial setup | |
| state.add_log('INFO', 'Initializing Master Processor...', Styles.LOG_CONFIG) | |
| state.data_manager.fetch_symbols(limit=CONFIG['MAX_MARKETS']) | |
| # 2. Main Scan Loop | |
| while state.is_running: | |
| symbol = state.data_manager.get_next_symbol() | |
| if not symbol: | |
| state.add_log('WARNING', 'No symbols to scan. Retrying in 10s.', Styles.LOG_WARNING) | |
| time.sleep(10) | |
| continue | |
| state.last_analyzed_symbol = symbol | |
| state.add_log('INFO', f"Analyzing {symbol}...", Styles.LOG_INFO) | |
| # 3. Get Data for all strategies | |
| # Strategy 2 requires max data (15m: 100, 1m: 50) | |
| data = state.data_manager.get_data_for_strategy(symbol, 2) | |
| klines_15m = data.get('klines_15m', []) | |
| klines_1m = data.get('klines_1m', []) | |
| if not klines_15m or not klines_1m: | |
| state.add_log('ERROR', f"Failed to get data for {symbol}", Styles.LOG_ERROR) | |
| # Render UI to show error | |
| live.update(render_ui(state)) | |
| time.sleep(CONFIG['MARKET_SCAN_DELAY_MS'] / 1000) | |
| continue | |
| # 4. Run Strategies | |
| # Strategy 1: BB Re-entry (needs 15m) | |
| result_1 = analyze_strategy_1(klines_15m) | |
| if result_1['signal'] in ['BUY', 'SELL']: | |
| state.update_active_signals(symbol, 1, result_1) | |
| # User requested to include symbol and color in the log | |
| direction = result_1['signal'] | |
| log_style = Styles.SIGNAL_LONG if direction == 'BUY' else Styles.SIGNAL_SHORT | |
| log_message = f"Signal {direction} for {symbol} @ {result_1['curr_close']:.4f}" | |
| state.add_log('STRATEGY 1', log_message, log_style) | |
| # Strategy 2: Doji Box (needs 15m and 1m) | |
| result_2 = analyze_strategy_2(klines_15m, klines_1m) | |
| if result_2['signal'] in ['LONG', 'SHORT']: | |
| state.update_active_signals(symbol, 2, result_2) | |
| # User requested to include symbol and color in the log | |
| direction = result_2['signal'] | |
| log_style = Styles.SIGNAL_LONG if direction == 'LONG' else Styles.SIGNAL_SHORT | |
| log_message = f"Signal {direction} for {symbol} @ {result_2['entry_price']:.4f}" | |
| state.add_log('STRATEGY 2', log_message, log_style) | |
| # Strategy 3: Trend/Volume Scalper (needs 3m or 1m) | |
| klines_3m = data.get('klines_3m', []) | |
| klines_1m_for_strat3 = data.get('klines_1m', []) | |
| # Use 3m if available, otherwise use 1m | |
| klines_for_strat3 = klines_3m if klines_3m else klines_1m_for_strat3 | |
| if not klines_for_strat3: | |
| state.add_log('ERROR', f"Failed to get 3m/1m data for Strategy 3 on {symbol}", Styles.LOG_ERROR) | |
| else: | |
| result_3 = analyze_strategy_3(klines_for_strat3) | |
| if result_3['signal'] in ['BUY', 'SELL']: | |
| state.update_active_signals(symbol, 3, result_3) | |
| # User requested to include symbol and color in the log | |
| direction = result_3['signal'] | |
| log_style = Styles.SIGNAL_LONG if direction == 'BUY' else Styles.SIGNAL_SHORT | |
| log_message = f"Signal {direction} for {symbol} ({result_3['trend']})" | |
| state.add_log('STRATEGY 3', log_message, log_style) | |
| # 5. Check for Official Signal | |
| state.check_official_signal(symbol) | |
| # 6. Update PnL for active signals | |
| # Fetch current price for PnL update | |
| current_price_data = state.data_manager._fetch_api('/fapi/v1/ticker/price', {'symbol': symbol}) | |
| if current_price_data and 'price' in current_price_data: | |
| current_price = float(current_price_data['price']) | |
| state.update_pnl(symbol, current_price) | |
| # 7. Render UI | |
| live.update(render_ui(state)) | |
| # 8. Wait for next market scan | |
| time.sleep(CONFIG['MARKET_SCAN_DELAY_MS'] / 1000) | |
| # --- Entry Point --- | |
| if __name__ == '__main__': | |
| # Create a dummy telegram_config.json for demonstration | |
| dummy_config = { | |
| "token": "YOUR_TELEGRAM_BOT_TOKEN", | |
| "chat_id": "YOUR_TELEGRAM_CHAT_ID" | |
| } | |
| try: | |
| with open(CONFIG['TELEGRAM_CONFIG_FILE'], 'w') as f: | |
| json.dump(dummy_config, f, indent=4) | |
| except Exception as e: | |
| print(f"Error creating telegram_config.json: {e}") | |
| state = State() | |
| try: | |
| with Live(render_ui(state), screen=True, refresh_per_second=4) as live: | |
| # Run a few cycles for demonstration | |
| for _ in range(5): # Run 5 cycles of the market list | |
| main_loop(state, live) | |
| if not state.is_running: | |
| break | |
| except KeyboardInterrupt: | |
| state.is_running = False | |
| state.console.print("\n[bold red]Master Processor stopped by user.[/bold red]") | |
| except Exception as e: | |
| state.add_log('CRITICAL', f"Loop Error: {e}", Styles.LOG_CRITICAL) | |
| state.console.print(f"\n[bold red]An unexpected error occurred:[/bold red] {e}") | |
| finally: | |
| state.console.print("\n[bold green]Master Processor finished execution.[/bold green]") | |
| state.console.print("[bold white]Final Official Signals:[/bold white]") | |
| for s in state.official_signals: | |
| pnl_style = Styles.SIGNAL_LONG if s['pnl_percent'] >= 0 else Styles.SIGNAL_SHORT | |
| pnl_text = Text(f"{s['pnl_percent']:+.2f}%", style=pnl_style) | |
| state.console.print(f"[{s['status']}] {s['direction']} {s['symbol']} @ {s['entry_price']:.4f} | PnL: {pnl_text}") | |