import asyncio import ccxt.async_support as ccxt import yaml import pandas as pd import numpy as np from collections import defaultdict, deque from datetime import datetime, timedelta from telegram import Bot from telegram.constants import ParseMode from telegram.error import TelegramError import ta # for RSI etc. import logging # === Setup Logging === logging.basicConfig( format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # === Load Configuration === with open('config.yaml', 'r', encoding='utf-8') as f: cfg = yaml.safe_load(f) # === Initialize Telegram Bot === bot = Bot(token=cfg['telegram']['bot_token']) chat_id = cfg['telegram']['chat_id'] # === CCXT Exchange === exchange = ccxt.lbank({ 'apiKey': cfg['lbank']['api_key'], 'secret': cfg['lbank']['secret_key'], 'enableRateLimit': True }) PAIRS = cfg['pairs'] TIMEFRAMES = cfg['timeframes'] # in minutes, e.g. [1,5,15,30,60] WINDOW = max(TIMEFRAMES) * 2 # for next candle prediction window # Data storage: for each symbol & timeframe, store trades trade_buffers = { sym: {tf: deque() for tf in TIMEFRAMES} for sym in PAIRS } async def fetch_and_update(symbol): try: trades = await exchange.fetch_trades(symbol) now = datetime.utcnow() for trade in trades: ts = datetime.utcfromtimestamp(trade['timestamp'] / 1000) size = trade['amount'] price = trade['price'] side = trade['side'] # 'buy' or 'sell' for tf in TIMEFRAMES: buffer = trade_buffers[symbol][tf] buffer.append({'time': ts, 'size': size, 'price': price, 'side': side}) # purge old while buffer and buffer[0]['time'] < now - timedelta(minutes=tf): buffer.popleft() except Exception as e: logger.error(f"Error fetching trades for {symbol}: {e}") await notify_status(f"🔴 اتصال به صرافی برای {symbol} با خطا: {e}") async def analyze_and_signal(): for symbol in PAIRS: summary = {} for tf in TIMEFRAMES: buf = trade_buffers[symbol][tf] if not buf: continue df = pd.DataFrame(buf) # compute footprint: volume by price bucket df['bucket'] = (df['price'] / cfg['price_bucket']).round(0) * cfg['price_bucket'] grouped = df.groupby('bucket').agg( buy_vol=('size', lambda x: x[df.loc[x.index,'side']=='buy'].sum()), sell_vol=('size', lambda x: x[df.loc[x.index,'side']=='sell'].sum()) ) # PoC: price level with max total vol grouped['total'] = grouped['buy_vol'] + grouped['sell_vol'] poc = grouped['total'].idxmax() delta = grouped['buy_vol'].sum() - grouped['sell_vol'].sum() # RSI on volume vol_series = df['size'].resample(f'{tf}T', on='time').sum().fillna(0) rsi = ta.momentum.rsi(vol_series, window=14).iloc[-1] summary[tf] = {'poc': poc, 'delta': delta, 'rsi_vol': rsi} # signal logic: simple threshold-based example # check 1h delta positive & rsi_vol > cfg threshold etc. long_score = sum(1 for tf,val in summary.items() if val['delta'] > 0 and val['rsi_vol'] > cfg['rsi_threshold']) conf = long_score / len(summary) if conf >= cfg['signal_confidence']: tp = summary[TIMEFRAMES[-1]]['poc'] * (1 + cfg['tp_pct']) sl = summary[TIMEFRAMES[-1]]['poc'] * (1 - cfg['sl_pct']) text = ( f"💰 سیگنال خرید {symbol}\n" f"🔢 امتیاز: {conf*100:.1f}%\n" f"🕒 TF اصلی: {TIMEFRAMES[-1]} دقیقه\n" f"📌 PoC: {summary[TIMEFRAMES[-1]]['poc']}\n" f"🎯 TP: {tp:.4f} | 🛑 SL: {sl:.4f}\n" f"⏳ مدت زمان پیشنهادی: تا {cfg['max_duration_h']} ساعت" ) await send_signal(text) async def send_signal(message): try: await bot.send_message(chat_id=chat_id, text=message, parse_mode=ParseMode.HTML) except TelegramError as e: logger.error(f"Failed to send signal: {e}") async def notify_status(message): await send_signal(message) async def main_loop(): while True: tasks = [fetch_and_update(sym) for sym in PAIRS] await asyncio.gather(*tasks) await analyze_and_signal() await asyncio.sleep(cfg['fetch_interval_sec']) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main_loop()) except KeyboardInterrupt: logger.info("Stopped by user") finally: loop.run_until_complete(exchange.close())