| import asyncio |
| import aiohttp |
| import yfinance as yf |
| import numpy as np |
| import pandas as pd |
| import warnings |
| from typing import Dict, List, Tuple, Optional, Union |
| from datetime import datetime, timedelta |
| import matplotlib |
| matplotlib.use('Agg') |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| import io |
| import base64 |
| from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor |
| import logging |
|
|
| warnings.filterwarnings('ignore') |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class AsyncTradingGridGenerator: |
| """ |
| Asynchronous trading grid generator for Telegram bots |
| """ |
| def __init__(self): |
| self.strategies = { |
| "conservative": { |
| "growth": 1.15, |
| "drop_pct": 0.2, |
| "levels": 5, |
| "risk_factor": 0.8, |
| "description": "π‘οΈ Conservative strategy with minimal risk", |
| "emoji": "π" |
| }, |
| "medium": { |
| "growth": 1.3, |
| "drop_pct": 0.35, |
| "levels": 8, |
| "risk_factor": 1.0, |
| "description": "βοΈ Balanced strategy with medium aggression", |
| "emoji": "π―" |
| }, |
| "aggressive": { |
| "growth": 1.5, |
| "drop_pct": 0.5, |
| "levels": 12, |
| "risk_factor": 1.3, |
| "description": "π Aggressive strategy with high profitability", |
| "emoji": "π₯" |
| }, |
| "ultra_aggressive": { |
| "growth": 1.8, |
| "drop_pct": 0.7, |
| "levels": 15, |
| "risk_factor": 1.6, |
| "description": "β‘ Maximum aggressive strategy", |
| "emoji": "π₯" |
| } |
| } |
| self.executor = ProcessPoolExecutor(max_workers=4) |
| self.chart_executor = ThreadPoolExecutor(max_workers=2) |
|
|
| @staticmethod |
| def fetch_data(ticker: str, period: str = "1y") -> pd.DataFrame: |
| try: |
| stock = yf.Ticker(ticker) |
| data = stock.history(period=period) |
| if data.empty: |
| raise ValueError(f"Data for ticker {ticker} not found") |
| |
| data['SMA_20'] = data['Close'].rolling(20).mean() |
| data['SMA_50'] = data['Close'].rolling(50).mean() |
| data['Volatility'] = data['Close'].pct_change().rolling(20).std() * np.sqrt(252) |
| return data |
| except Exception as e: |
| logger.error(f"Error getting data for {ticker}: {e}") |
| raise ValueError(f"Data retrieval error: {e}") |
|
|
| async def get_stock_data_async(self, ticker: str, period: str = "1y") -> pd.DataFrame: |
| """Asynchronous stock data retrieval""" |
| loop = asyncio.get_event_loop() |
| return await loop.run_in_executor( |
| self.executor, AsyncTradingGridGenerator.fetch_data, ticker, period |
| ) |
|
|
| def calculate_technical_indicators(self, data: pd.DataFrame) -> Dict[str, float]: |
| """Technical indicators calculation""" |
| current_price = data['Close'].iloc[-1] |
| |
| high_low = data['High'] - data['Low'] |
| high_close = np.abs(data['High'] - data['Close'].shift()) |
| low_close = np.abs(data['Low'] - data['Close'].shift()) |
| tr = np.maximum.reduce([high_low, high_close, low_close]) |
| tr_series = pd.Series(tr, index=data.index) |
| atr = tr_series.rolling(14).mean().iloc[-1] |
| |
| support = data['Low'].rolling(20).min().iloc[-1] |
| resistance = data['High'].rolling(20).max().iloc[-1] |
| |
| volatility = data['Volatility'].iloc[-1] if not pd.isna(data['Volatility'].iloc[-1]) else 0.25 |
| |
| delta = data['Close'].diff() |
| gain = (delta.where(delta > 0, 0)).rolling(14).mean() |
| loss = (-delta.where(delta < 0, 0)).rolling(14).mean() |
| rs = gain / loss |
| rsi = 100 - (100 / (1 + rs)) |
| return { |
| 'current_price': current_price, |
| 'atr': atr, |
| 'support': support, |
| 'resistance': resistance, |
| 'volatility': volatility, |
| 'rsi': rsi.iloc[-1] if not pd.isna(rsi.iloc[-1]) else 50, |
| 'sma_20': data['SMA_20'].iloc[-1] if not pd.isna(data['SMA_20'].iloc[-1]) else current_price, |
| 'sma_50': data['SMA_50'].iloc[-1] if not pd.isna(data['SMA_50'].iloc[-1]) else current_price |
| } |
|
|
| def fibonacci_levels(self, current_price: float, low: float, high: float) -> List[float]: |
| """Fibonacci levels calculation""" |
| ratios = [0.236, 0.382, 0.5, 0.618, 0.786, 0.886] |
| levels = [] |
| for ratio in ratios: |
| level = high - (high - low) * ratio |
| if level < current_price: |
| levels.append(level) |
| return sorted(levels, reverse=True) |
|
|
| def geometric_levels(self, current_price: float, drop_pct: float, levels: int) -> List[float]: |
| """Geometric levels""" |
| min_price = current_price * (1 - drop_pct) |
| step = (current_price - min_price) / levels |
| return [current_price - step * (i + 1) for i in range(levels)] |
|
|
| def volatility_adjusted_levels(self, current_price: float, atr: float, |
| volatility: float, levels: int) -> List[float]: |
| """Volatility-based levels""" |
| vol_step = atr * (1 + volatility) |
| return [current_price - vol_step * (i + 1) for i in range(levels)] |
|
|
| def combine_and_optimize_levels(self, fib_levels: List[float], |
| geom_levels: List[float], |
| vol_levels: List[float], |
| current_price: float, |
| min_distance: float = 0.02) -> List[float]: |
| """Combining and optimizing levels""" |
| all_levels = fib_levels + geom_levels + vol_levels |
| |
| unique_levels = list(set([round(level, 2) for level in all_levels |
| if level < current_price and level > 0])) |
| unique_levels.sort(reverse=True) |
| |
| optimized_levels = [] |
| last_level = current_price |
| for level in unique_levels: |
| distance = abs(last_level - level) / current_price |
| if distance >= min_distance: |
| optimized_levels.append(level) |
| last_level = level |
| return optimized_levels |
|
|
| def calculate_position_sizes(self, levels: List[float], current_price: float, |
| capital: float, growth: float, |
| risk_factor: float) -> List[float]: |
| """Position sizes calculation""" |
| n = len(levels) |
| if n == 0: |
| return [] |
| |
| weights = [] |
| for i, level in enumerate(levels): |
| drop_pct = (current_price - level) / current_price |
| weight = (growth ** i) * (1 + drop_pct * risk_factor) |
| weights.append(weight) |
| |
| total_weight = sum(weights) |
| position_sizes = [(weight / total_weight) * capital for weight in weights] |
| return position_sizes |
|
|
| def calculate_grid_metrics(self, df: pd.DataFrame, current_price: float) -> Dict[str, float]: |
| """Grid metrics calculation""" |
| if df.empty: |
| return {} |
| total_capital = df['OrderSize'].sum() |
| max_drawdown = df['%Drop'].max() |
| avg_order_size = df['OrderSize'].mean() |
| |
| potential_profit = 0 |
| for _, row in df.iterrows(): |
| shares = row['OrderSize'] / row['Price'] |
| profit = shares * (current_price - row['Price']) |
| potential_profit += profit |
| return { |
| 'total_capital': total_capital, |
| 'max_drawdown': max_drawdown, |
| 'avg_order_size': avg_order_size, |
| 'potential_profit': potential_profit, |
| 'profit_margin': (potential_profit / total_capital) * 100 if total_capital > 0 else 0, |
| 'number_of_orders': len(df) |
| } |
|
|
| async def generate_grid_async(self, ticker: str, capital: float = 10000, |
| strategy: str = "medium") -> Tuple[Dict, pd.DataFrame, Dict]: |
| """Asynchronous grid generation""" |
| if strategy not in self.strategies: |
| raise ValueError(f"Unknown strategy: {strategy}") |
| |
| data = await self.get_stock_data_async(ticker) |
| indicators = self.calculate_technical_indicators(data) |
| strategy_params = self.strategies[strategy] |
| current_price = indicators['current_price'] |
| |
| fib_levels = self.fibonacci_levels( |
| current_price, |
| indicators['support'], |
| indicators['resistance'] |
| ) |
| geom_levels = self.geometric_levels( |
| current_price, |
| strategy_params['drop_pct'], |
| strategy_params['levels'] |
| ) |
| vol_levels = self.volatility_adjusted_levels( |
| current_price, |
| indicators['atr'], |
| indicators['volatility'], |
| strategy_params['levels'] // 2 |
| ) |
| |
| combined_levels = self.combine_and_optimize_levels( |
| fib_levels, geom_levels, vol_levels, current_price |
| ) |
| if not combined_levels: |
| raise ValueError("Failed to generate grid levels") |
| |
| position_sizes = self.calculate_position_sizes( |
| combined_levels, |
| current_price, |
| capital, |
| strategy_params['growth'], |
| strategy_params['risk_factor'] |
| ) |
| |
| df = pd.DataFrame({ |
| 'Level': range(1, len(combined_levels) + 1), |
| 'Price': combined_levels, |
| 'OrderSize': position_sizes, |
| 'Shares': [size / price for size, price in zip(position_sizes, combined_levels)], |
| '%Drop': [(current_price - price) / current_price * 100 for price in combined_levels], |
| 'Distance_ATR': [(current_price - price) / indicators['atr'] for price in combined_levels] |
| }) |
| |
| df['Type'] = 'Combined' |
| for i, price in enumerate(combined_levels): |
| if price in fib_levels: |
| df.loc[i, 'Type'] = 'Fibonacci' |
| elif abs(price - min(geom_levels, key=lambda x: abs(x - price))) < 0.01: |
| df.loc[i, 'Type'] = 'Geometric' |
| |
| metrics = self.calculate_grid_metrics(df, current_price) |
| return indicators, df, metrics |
|
|
| def format_telegram_message(self, ticker: str, indicators: Dict, df: pd.DataFrame, |
| metrics: Dict, strategy: str) -> str: |
| """Telegram message formatting""" |
| strategy_info = self.strategies[strategy] |
| |
| trend_emoji = "π" if indicators['current_price'] > indicators['sma_20'] else "π" |
| rsi_status = "π΄ Oversold" if indicators['rsi'] < 30 else "π’ Overbought" if indicators[ |
| 'rsi'] > 70 else "π‘ Neutral" |
| message = f"""π― <b>TRADING GRID {ticker.upper()}</b> |
| {strategy_info['emoji']} <b>Strategy:</b> {strategy.upper()} |
| {strategy_info['description']} |
| |
| π <b>CURRENT INDICATORS:</b> |
| π° Price: <code>${indicators['current_price']:.2f}</code> {trend_emoji} |
| π RSI: <code>{indicators['rsi']:.1f}</code> {rsi_status} |
| β‘ Volatility: <code>{indicators['volatility']:.1%}</code> |
| π― ATR: <code>${indicators['atr']:.2f}</code> |
| π» Support: <code>${indicators['support']:.2f}</code> |
| πΊ Resistance: <code>${indicators['resistance']:.2f}</code> |
| |
| π <b>GRID METRICS:</b> |
| π΅ Capital: <code>${metrics['total_capital']:.2f}</code> |
| π Max drawdown: <code>{metrics['max_drawdown']:.2f}%</code> |
| π― Orders: <code>{metrics['number_of_orders']}</code> |
| π° Potential: <code>${metrics['potential_profit']:.2f}</code> |
| π Margin: <code>{metrics['profit_margin']:.2f}%</code> |
| |
| π― <b>TOP-{min(5, len(df))} LEVELS:</b> |
| ``` |
| β Price Size Drop Type |
| """ |
| |
| for i in range(min(5, len(df))): |
| row = df.iloc[i] |
| type_emoji = "π" if row['Type'] == 'Fibonacci' else "π" if row['Type'] == 'Geometric' else "β‘" |
| message += f"{row['Level']:2.0f} ${row['Price']:6.2f} ${row['OrderSize']:7.0f} {row['%Drop']:6.2f}% {type_emoji}\n" |
| message += "```" |
| if len(df) > 5: |
| message += f"\nπ <i>Showing {min(5, len(df))} of {len(df)} levels</i>" |
| return message |
|
|
| def format_comparison_message(self, ticker: str, comparison_data: List[Dict]) -> str: |
| """Strategy comparison message formatting for Telegram""" |
| message = f"""π <b>STRATEGY COMPARISON {ticker.upper()}</b> |
| """ |
| for data in comparison_data: |
| strategy = data['strategy'] |
| strategy_info = self.strategies[strategy] |
| message += f"""{strategy_info['emoji']} <b>{strategy.upper()}</b> |
| π― Orders: <code>{data['orders']}</code> |
| π Drawdown: <code>{data['max_drawdown']:.2f}%</code> |
| π° Profit: <code>${data['potential_profit']:.0f}</code> |
| π Margin: <code>{data['profit_margin']:.1f}%</code> |
| """ |
| |
| best_strategy = max(comparison_data, key=lambda x: x['profit_margin']) |
| message += f"π‘ <b>Recommendation:</b> {self.strategies[best_strategy['strategy']]['emoji']} {best_strategy['strategy'].upper()}" |
| return message |
|
|
| def create_chart(self, ticker, indicators, df, strategy): |
| fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 10)) |
| fig.suptitle(f'{ticker.upper()} - {strategy}', fontsize=14, fontweight='bold') |
| current_price = indicators['current_price'] |
| |
| colors = ['#FF6B6B' if t == 'Fibonacci' else '#4ECDC4' if t == 'Geometric' else '#45B7D1' |
| for t in df['Type']] |
| ax1.barh(df['Level'], df['OrderSize'], color=colors, alpha=0.8) |
| ax1.set_xlabel('Order Size ($)') |
| ax1.set_ylabel('Level') |
| ax1.set_title('π Order Distribution') |
| ax1.grid(True, alpha=0.3) |
| |
| ax2.scatter(df['Price'], df['%Drop'], c=colors, s=df['OrderSize'] / 30, alpha=0.8) |
| ax2.axvline(x=current_price, color='red', linestyle='--', alpha=0.7, |
| label=f'${current_price:.2f}') |
| ax2.set_xlabel('Price ($)') |
| ax2.set_ylabel('% Drop') |
| ax2.set_title('π― Levels and Drops') |
| ax2.legend() |
| ax2.grid(True, alpha=0.3) |
| |
| cumulative = df['OrderSize'].cumsum() |
| ax3.plot(df['Level'], cumulative, marker='o', linewidth=2, color='#FF6B6B') |
| ax3.fill_between(df['Level'], cumulative, alpha=0.3, color='#FF6B6B') |
| ax3.set_xlabel('Level') |
| ax3.set_ylabel('Capital ($)') |
| ax3.set_title('π° Cumulative Capital') |
| ax3.grid(True, alpha=0.3) |
| |
| type_counts = df['Type'].value_counts() |
| colors_pie = ['#FF6B6B', '#4ECDC4', '#45B7D1'] |
| ax4.pie(type_counts.values, labels=type_counts.index, autopct='%1.1f%%', |
| startangle=90, colors=colors_pie) |
| ax4.set_title('π Level Types') |
| plt.tight_layout() |
| |
| buf = io.BytesIO() |
| plt.savefig(buf, format='png', dpi=150, bbox_inches='tight') |
| buf.seek(0) |
| chart_bytes = buf.getvalue() |
| buf.close() |
| plt.close() |
| return chart_bytes |
|
|
| async def create_grid_chart_async(self, ticker: str, indicators: Dict, |
| df: pd.DataFrame, strategy: str) -> bytes: |
| """Asynchronous grid chart creation""" |
| loop = asyncio.get_event_loop() |
| return await loop.run_in_executor( |
| self.chart_executor, self.create_chart, ticker, indicators, df, strategy |
| ) |
|
|
|
|
| |
|
|
| async def generate_grid_message(ticker: str, capital: float = 10000, |
| strategy: str = "medium") -> Tuple[str, Optional[bytes]]: |
| """ |
| Grid message and chart generation for Telegram bot |
| |
| Returns: |
| Tuple[str, Optional[bytes]]: (message, chart in bytes) |
| """ |
| generator = AsyncTradingGridGenerator() |
| try: |
| indicators, df, metrics = await generator.generate_grid_async(ticker, capital, strategy) |
| message = generator.format_telegram_message(ticker, indicators, df, metrics, strategy) |
| chart = await generator.create_grid_chart_async(ticker, indicators, df, strategy) |
| return message, chart |
| except Exception as e: |
| error_message = f"β <b>Grid generation error</b>\n\nπ Ticker: <code>{ticker}</code>\nπ« Error: <i>{str(e)}</i>" |
| logger.error(f"Grid generation error for {ticker}: {e}", exc_info=True) |
| return error_message, None |
|
|
|
|
| async def compare_strategies_message(ticker: str, capital: float = 10000, |
| strategies: List[str] = None) -> str: |
| """ |
| Strategy comparison for Telegram bot |
| |
| Returns: |
| str: Formatted message for Telegram |
| """ |
| if strategies is None: |
| strategies = ["conservative", "medium", "aggressive"] |
| generator = AsyncTradingGridGenerator() |
| comparison_data = [] |
| try: |
| for strategy in strategies: |
| try: |
| indicators, df, metrics = await generator.generate_grid_async(ticker, capital, strategy) |
| comparison_data.append({ |
| 'strategy': strategy, |
| 'orders': metrics['number_of_orders'], |
| 'max_drawdown': metrics['max_drawdown'], |
| 'potential_profit': metrics['potential_profit'], |
| 'profit_margin': metrics['profit_margin'], |
| }) |
| except Exception as e: |
| logger.error(f"Error for strategy {strategy}: {e}") |
| continue |
| if comparison_data: |
| return generator.format_comparison_message(ticker, comparison_data) |
| else: |
| return f"β <b>Error</b>\n\nCould not retrieve data for {ticker.upper()}" |
| except Exception as e: |
| logger.error(f"Strategy comparison error for {ticker}: {e}") |
| return f"β <b>Comparison error</b>\n\nπ Ticker: <code>{ticker}</code>\nπ« Error: <i>{str(e)}</i>" |
|
|
|
|
| async def get_available_strategies() -> Dict[str, Dict]: |
| """Get list of available strategies""" |
| generator = AsyncTradingGridGenerator() |
| return generator.strategies |
|
|
|
|
| async def validate_ticker(ticker: str) -> bool: |
| """Ticker validation""" |
| generator = AsyncTradingGridGenerator() |
| try: |
| await generator.get_stock_data_async(ticker, period="5d") |
| return True |
| except: |
| return False |
|
|
|
|
| |
| class TelegramGridBot: |
| """ |
| Example class for Telegram bot integration |
| """ |
|
|
| def __init__(self): |
| self.generator = AsyncTradingGridGenerator() |
|
|
| async def handle_grid_command(self, ticker: str, capital: str = "10000", |
| strategy: str = "medium") -> Tuple[str, Optional[bytes]]: |
| """Grid generation command handler""" |
| try: |
| capital_float = float(capital) |
| if capital_float <= 0: |
| return "β Capital must be a positive number", None |
|
|
| return await generate_grid_message(ticker.upper(), capital_float, strategy.lower()) |
| except ValueError: |
| return "β Invalid capital format. Enter a number.", None |
| except Exception as e: |
| return f"β An error occurred: {str(e)}", None |
|
|
| async def handle_compare_command(self, ticker: str, capital: str = "10000") -> str: |
| """Strategy comparison command handler""" |
| try: |
| capital_float = float(capital) |
| if capital_float <= 0: |
| return "β Capital must be a positive number" |
| return await compare_strategies_message(ticker.upper(), capital_float) |
| except ValueError: |
| return "β Invalid capital format. Enter a number." |
| except Exception as e: |
| return f"β An error occurred: {str(e)}" |
|
|
| async def handle_strategies_command(self) -> str: |
| """Available strategies list""" |
| strategies = await get_available_strategies() |
| message = "π <b>AVAILABLE STRATEGIES:</b>\n\n" |
| for name, info in strategies.items(): |
| message += f"""{info['emoji']} <b>{name.upper()}</b> |
| {info['description']} |
| π Growth: <code>{info['growth']}</code> |
| π Max drop: <code>{info['drop_pct']:.0%}</code> |
| π― Levels: <code>{info['levels']}</code> |
| """ |
| return message |
|
|
|
|
| |
| async def main(): |
| """Usage example""" |
| ticker = "NVDA" |
| |
| message, chart = await generate_grid_message(ticker, 10000, "aggressive") |
| with open('chart.png', 'wb') as f: |
| f.write(chart) |
| print("Message:", message) |
| print("Chart generated:", chart is not None) |
|
|
| |
| comparison = await compare_strategies_message(ticker, 10000) |
| print("\nComparison:", comparison) |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |
|
|