Spaces:
Runtime error
Runtime error
| import asyncio | |
| import ccxt.async_support as ccxt | |
| import yaml | |
| import pandas as pd | |
| import numpy as np | |
| from collections import defaultdict, deque | |
| from datetime import datetime, timedelta | |
| from telegram import Bot | |
| from telegram.constants import ParseMode | |
| from telegram.error import TelegramError | |
| import ta # for RSI etc. | |
| import logging | |
| # === Setup Logging === | |
| logging.basicConfig( | |
| format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # === Load Configuration === | |
| with open('config.yaml', 'r', encoding='utf-8') as f: | |
| cfg = yaml.safe_load(f) | |
| # === Initialize Telegram Bot === | |
| bot = Bot(token=cfg['telegram']['bot_token']) | |
| chat_id = cfg['telegram']['chat_id'] | |
| # === CCXT Exchange === | |
| exchange = ccxt.lbank({ | |
| 'apiKey': cfg['lbank']['api_key'], | |
| 'secret': cfg['lbank']['secret_key'], | |
| 'enableRateLimit': True | |
| }) | |
| PAIRS = cfg['pairs'] | |
| TIMEFRAMES = cfg['timeframes'] # in minutes, e.g. [1,5,15,30,60] | |
| WINDOW = max(TIMEFRAMES) * 2 # for next candle prediction window | |
| # Data storage: for each symbol & timeframe, store trades | |
| trade_buffers = { | |
| sym: {tf: deque() for tf in TIMEFRAMES} for sym in PAIRS | |
| } | |
| async def fetch_and_update(symbol): | |
| try: | |
| trades = await exchange.fetch_trades(symbol) | |
| now = datetime.utcnow() | |
| for trade in trades: | |
| ts = datetime.utcfromtimestamp(trade['timestamp'] / 1000) | |
| size = trade['amount'] | |
| price = trade['price'] | |
| side = trade['side'] # 'buy' or 'sell' | |
| for tf in TIMEFRAMES: | |
| buffer = trade_buffers[symbol][tf] | |
| buffer.append({'time': ts, 'size': size, 'price': price, 'side': side}) | |
| # purge old | |
| while buffer and buffer[0]['time'] < now - timedelta(minutes=tf): | |
| buffer.popleft() | |
| except Exception as e: | |
| logger.error(f"Error fetching trades for {symbol}: {e}") | |
| await notify_status(f"🔴 اتصال به صرافی برای {symbol} با خطا: {e}") | |
| async def analyze_and_signal(): | |
| for symbol in PAIRS: | |
| summary = {} | |
| for tf in TIMEFRAMES: | |
| buf = trade_buffers[symbol][tf] | |
| if not buf: | |
| continue | |
| df = pd.DataFrame(buf) | |
| # compute footprint: volume by price bucket | |
| df['bucket'] = (df['price'] / cfg['price_bucket']).round(0) * cfg['price_bucket'] | |
| grouped = df.groupby('bucket').agg( | |
| buy_vol=('size', lambda x: x[df.loc[x.index,'side']=='buy'].sum()), | |
| sell_vol=('size', lambda x: x[df.loc[x.index,'side']=='sell'].sum()) | |
| ) | |
| # PoC: price level with max total vol | |
| grouped['total'] = grouped['buy_vol'] + grouped['sell_vol'] | |
| poc = grouped['total'].idxmax() | |
| delta = grouped['buy_vol'].sum() - grouped['sell_vol'].sum() | |
| # RSI on volume | |
| vol_series = df['size'].resample(f'{tf}T', on='time').sum().fillna(0) | |
| rsi = ta.momentum.rsi(vol_series, window=14).iloc[-1] | |
| summary[tf] = {'poc': poc, 'delta': delta, 'rsi_vol': rsi} | |
| # signal logic: simple threshold-based example | |
| # check 1h delta positive & rsi_vol > cfg threshold etc. | |
| long_score = sum(1 for tf,val in summary.items() if val['delta'] > 0 and val['rsi_vol'] > cfg['rsi_threshold']) | |
| conf = long_score / len(summary) | |
| if conf >= cfg['signal_confidence']: | |
| tp = summary[TIMEFRAMES[-1]]['poc'] * (1 + cfg['tp_pct']) | |
| sl = summary[TIMEFRAMES[-1]]['poc'] * (1 - cfg['sl_pct']) | |
| text = ( | |
| f"💰 <b>سیگنال خرید</b> {symbol}\n" | |
| f"🔢 امتیاز: {conf*100:.1f}%\n" | |
| f"🕒 TF اصلی: {TIMEFRAMES[-1]} دقیقه\n" | |
| f"📌 PoC: {summary[TIMEFRAMES[-1]]['poc']}\n" | |
| f"🎯 TP: {tp:.4f} | 🛑 SL: {sl:.4f}\n" | |
| f"⏳ مدت زمان پیشنهادی: تا {cfg['max_duration_h']} ساعت" | |
| ) | |
| await send_signal(text) | |
| async def send_signal(message): | |
| try: | |
| await bot.send_message(chat_id=chat_id, text=message, parse_mode=ParseMode.HTML) | |
| except TelegramError as e: | |
| logger.error(f"Failed to send signal: {e}") | |
| async def notify_status(message): | |
| await send_signal(message) | |
| async def main_loop(): | |
| while True: | |
| tasks = [fetch_and_update(sym) for sym in PAIRS] | |
| await asyncio.gather(*tasks) | |
| await analyze_and_signal() | |
| await asyncio.sleep(cfg['fetch_interval_sec']) | |
| if __name__ == '__main__': | |
| loop = asyncio.get_event_loop() | |
| try: | |
| loop.run_until_complete(main_loop()) | |
| except KeyboardInterrupt: | |
| logger.info("Stopped by user") | |
| finally: | |
| loop.run_until_complete(exchange.close()) |