fullbot / app.py
alianilikhaniA1's picture
Update app.py
704adea verified
import asyncio
import ccxt.async_support as ccxt
import yaml
import pandas as pd
import numpy as np
from collections import defaultdict, deque
from datetime import datetime, timedelta
from telegram import Bot
from telegram.constants import ParseMode
from telegram.error import TelegramError
import ta # for RSI etc.
import logging
# === Setup Logging ===
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO
)
logger = logging.getLogger(__name__)
# === Load Configuration ===
with open('config.yaml', 'r', encoding='utf-8') as f:
cfg = yaml.safe_load(f)
# === Initialize Telegram Bot ===
bot = Bot(token=cfg['telegram']['bot_token'])
chat_id = cfg['telegram']['chat_id']
# === CCXT Exchange ===
exchange = ccxt.lbank({
'apiKey': cfg['lbank']['api_key'],
'secret': cfg['lbank']['secret_key'],
'enableRateLimit': True
})
PAIRS = cfg['pairs']
TIMEFRAMES = cfg['timeframes'] # in minutes, e.g. [1,5,15,30,60]
WINDOW = max(TIMEFRAMES) * 2 # for next candle prediction window
# Data storage: for each symbol & timeframe, store trades
trade_buffers = {
sym: {tf: deque() for tf in TIMEFRAMES} for sym in PAIRS
}
async def fetch_and_update(symbol):
try:
trades = await exchange.fetch_trades(symbol)
now = datetime.utcnow()
for trade in trades:
ts = datetime.utcfromtimestamp(trade['timestamp'] / 1000)
size = trade['amount']
price = trade['price']
side = trade['side'] # 'buy' or 'sell'
for tf in TIMEFRAMES:
buffer = trade_buffers[symbol][tf]
buffer.append({'time': ts, 'size': size, 'price': price, 'side': side})
# purge old
while buffer and buffer[0]['time'] < now - timedelta(minutes=tf):
buffer.popleft()
except Exception as e:
logger.error(f"Error fetching trades for {symbol}: {e}")
await notify_status(f"🔴 اتصال به صرافی برای {symbol} با خطا: {e}")
async def analyze_and_signal():
for symbol in PAIRS:
summary = {}
for tf in TIMEFRAMES:
buf = trade_buffers[symbol][tf]
if not buf:
continue
df = pd.DataFrame(buf)
# compute footprint: volume by price bucket
df['bucket'] = (df['price'] / cfg['price_bucket']).round(0) * cfg['price_bucket']
grouped = df.groupby('bucket').agg(
buy_vol=('size', lambda x: x[df.loc[x.index,'side']=='buy'].sum()),
sell_vol=('size', lambda x: x[df.loc[x.index,'side']=='sell'].sum())
)
# PoC: price level with max total vol
grouped['total'] = grouped['buy_vol'] + grouped['sell_vol']
poc = grouped['total'].idxmax()
delta = grouped['buy_vol'].sum() - grouped['sell_vol'].sum()
# RSI on volume
vol_series = df['size'].resample(f'{tf}T', on='time').sum().fillna(0)
rsi = ta.momentum.rsi(vol_series, window=14).iloc[-1]
summary[tf] = {'poc': poc, 'delta': delta, 'rsi_vol': rsi}
# signal logic: simple threshold-based example
# check 1h delta positive & rsi_vol > cfg threshold etc.
long_score = sum(1 for tf,val in summary.items() if val['delta'] > 0 and val['rsi_vol'] > cfg['rsi_threshold'])
conf = long_score / len(summary)
if conf >= cfg['signal_confidence']:
tp = summary[TIMEFRAMES[-1]]['poc'] * (1 + cfg['tp_pct'])
sl = summary[TIMEFRAMES[-1]]['poc'] * (1 - cfg['sl_pct'])
text = (
f"💰 <b>سیگنال خرید</b> {symbol}\n"
f"🔢 امتیاز: {conf*100:.1f}%\n"
f"🕒 TF اصلی: {TIMEFRAMES[-1]} دقیقه\n"
f"📌 PoC: {summary[TIMEFRAMES[-1]]['poc']}\n"
f"🎯 TP: {tp:.4f} | 🛑 SL: {sl:.4f}\n"
f"⏳ مدت زمان پیشنهادی: تا {cfg['max_duration_h']} ساعت"
)
await send_signal(text)
async def send_signal(message):
try:
await bot.send_message(chat_id=chat_id, text=message, parse_mode=ParseMode.HTML)
except TelegramError as e:
logger.error(f"Failed to send signal: {e}")
async def notify_status(message):
await send_signal(message)
async def main_loop():
while True:
tasks = [fetch_and_update(sym) for sym in PAIRS]
await asyncio.gather(*tasks)
await analyze_and_signal()
await asyncio.sleep(cfg['fetch_interval_sec'])
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main_loop())
except KeyboardInterrupt:
logger.info("Stopped by user")
finally:
loop.run_until_complete(exchange.close())