gate333 / main.py
luguog's picture
Update main.py
1ef4dd9 verified
import os
import pandas as pd
import pandas_ta as ta
import datetime
import pytz
import logging
import asyncio
import websockets
import json
import csv
from collections import defaultdict
from typing import Dict, List, Optional
import threading
import time
# --- AI Library ---
from litellm import completion
from litellm.exceptions import APIError
# --- Setup and Constants ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Execution loop frequency (in seconds)
ANALYSIS_FREQUENCY_SECONDS = 30
BINANCE_WS_BASE = "wss://fstream.binance.com/ws/"
WEBSOCKET_STREAM = "!miniTicker@arr"
WEBSOCKET_URL = BINANCE_WS_BASE + WEBSOCKET_STREAM
# --- AI Model Configuration (33 Models via LiteLLM) ---
OPENROUTER_MODELS = [
"openrouter/openai/gpt-4o",
"openrouter/openai/gpt-4o-mini",
"openrouter/mistralai/mistral-large",
"openrouter/mistralai/mistral-large-2411",
"openrouter/perplexity/pplx-7b-chat",
"openrouter/perplexity/pplx-70b-online",
"openrouter/anthropic/claude-3-opus",
"openrouter/anthropic/claude-3-sonnet",
"openrouter/anthropic/claude-3-haiku",
"openrouter/google/gemini-2.0-flash-exp:free",
"openrouter/google/gemini-2.5-pro-preview-03-25",
"openrouter/meta-llama/llama-3-8b-instruct:free",
"openrouter/meta-llama/llama-3-70b-instruct:free",
"openrouter/nousresearch/nous-hermes-2-mixtral-8x7b-dpo",
"openrouter/qwen/qwen-2.5-72b-instruct",
"openrouter/deepseek/deepseek-chat",
"openrouter/deepseek/deepseek-coder",
]
GROQ_MODELS = [
"groq/llama-3.1-8b-instant",
"groq/llama-3.1-70b-versatile",
"groq/llama-3.2-11b-vision-preview",
"groq/llama-3.2-90b-vision-preview",
"groq/llama-3.2-3b-preview",
"groq/mixtral-8x7b-32768",
"groq/gemma2-9b-it",
]
HUGGINGFACE_MODELS = [
"huggingface/mistralai/Mistral-7B-Instruct-v0.2",
"huggingface/microsoft/DialoGPT-large",
"huggingface/google/flan-t5-xxl",
"huggingface/tiiuae/falcon-7b-instruct",
"huggingface/OpenAssistant/oasst-sft-4-pythia-12b-epoch-3.5",
]
# Combine all models to get exactly 33
ALL_MODELS = (OPENROUTER_MODELS + GROQ_MODELS + HUGGINGFACE_MODELS)[:33]
# Target symbols
TARGET_SYMBOLS = ['BTCUSDT', 'ETHUSDT', 'SOLUSDT', 'BNBUSDT', 'XRPUSDT']
# --- Data Structures ---
ALL_PERPS_DATA = {symbol: {'1m': pd.DataFrame(), '4h': pd.DataFrame(), '1d': pd.DataFrame()} for symbol in TARGET_SYMBOLS}
# Trading state
TRADING_STATE = {
'positions': {},
'signals_history': [],
'performance_metrics': defaultdict(list)
}
# CSV logging
CSV_FILENAME = "trading_performance.csv"
# --- Technical Analysis Functions ---
def get_td_sequential(series):
"""Calculate TD Sequential indicator."""
if len(series) < 9:
return pd.Series([0] * len(series), index=series.index)
td_vals = [0] * 8
for i in range(8, len(series)):
count = 0
for j in range(i-8, i+1):
if series.iloc[j] > series.iloc[j-4] if j >= 4 else True:
count += 1
td_vals.append(count)
return pd.Series(td_vals, index=series.index)
def format_value(value):
"""Format numerical values for display."""
if pd.isna(value):
return 'N/A'
if isinstance(value, (int, float)):
return f"{value:,.2f}"
return str(value)
def add_technical_indicators(df):
"""Add comprehensive technical indicators to DataFrame."""
if df.shape[0] < 50:
return df
# Ensure required columns exist
if 'High' not in df.columns:
df['High'] = df['Close'] * 1.001
if 'Low' not in df.columns:
df['Low'] = df['Close'] * 0.999
if 'Volume' not in df.columns:
df['Volume'] = 1000
# Moving averages
df['SMA_20'] = ta.sma(df['Close'], length=20)
df['SMA_50'] = ta.sma(df['Close'], length=50)
df['SMA_200'] = ta.sma(df['Close'], length=200)
# RSI
df['RSI_14'] = ta.rsi(df['Close'], length=14)
# MACD
macd = ta.macd(df['Close'])
if macd is not None:
df = df.join(macd)
# ADX
adx_data = ta.adx(df['High'], df['Low'], df['Close'], length=14)
if adx_data is not None:
df['ADX'] = adx_data['ADX_14']
# Bollinger Bands
bbands = ta.bbands(df['Close'], length=20)
if bbands is not None:
df = df.join(bbands)
# Stochastic
stoch = ta.stoch(df['High'], df['Low'], df['Close'])
if stoch is not None:
df = df.join(stoch)
# TD Sequential
df['TD_Seq'] = get_td_sequential(df['Close'])
return df
def format_data_for_gpt(latest_daily, latest_4h, latest_1m, symbol):
"""Format comprehensive trading data for AI analysis."""
def safe_get(series, key, default='N/A'):
if key in series and pd.notna(series[key]):
return format_value(series[key])
return default
prompt = f"""
Technical Analysis Request for {symbol}
DAILY TIMEFRAME:
- Price: {safe_get(latest_daily, 'Close')}
- SMA 20: {safe_get(latest_daily, 'SMA_20')}
- SMA 50: {safe_get(latest_daily, 'SMA_50')}
- SMA 200: {safe_get(latest_daily, 'SMA_200')}
- RSI 14: {safe_get(latest_daily, 'RSI_14')}
- ADX: {safe_get(latest_daily, 'ADX')}
- MACD: {safe_get(latest_daily, 'MACD_12_26_9')}
- TD Sequential: {safe_get(latest_daily, 'TD_Seq')}
4-HOUR TIMEFRAME:
- Price: {safe_get(latest_4h, 'Close')}
- SMA 20: {safe_get(latest_4h, 'SMA_20')}
- SMA 50: {safe_get(latest_4h, 'SMA_50')}
- RSI 14: {safe_get(latest_4h, 'RSI_14')}
- ADX: {safe_get(latest_4h, 'ADX')}
- MACD: {safe_get(latest_4h, 'MACD_12_26_9')}
1-MINUTE TIMEFRAME:
- Price: {safe_get(latest_1m, 'Close')}
- RSI 14: {safe_get(latest_1m, 'RSI_14')}
- ADX: {safe_get(latest_1m, 'ADX')}
Based on multi-timeframe technical analysis, provide ONLY a single word: 'BUY', 'SELL', or 'HOLD'.
Consider trend alignment, momentum, and overbought/oversold conditions across timeframes.
"""
return prompt
# --- AI Consensus Functions ---
async def get_ai_signal(prompt: str, model_name: str):
"""Get trading signal from individual AI model."""
system_prompt = """You are a professional trading analyst. Analyze the technical data and provide ONLY a single word: 'BUY', 'SELL', or 'HOLD'.
Consider:
- Trend alignment across timeframes
- RSI overbought (>70) or oversold (<30) conditions
- MACD momentum signals
- Support/resistance levels
- Overall market structure
Respond with exactly one word: BUY, SELL, or HOLD."""
try:
response = await asyncio.to_thread(
completion,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
],
model=model_name,
max_tokens=10,
temperature=0.3,
timeout=30
)
signal = response.choices[0].message.content.strip().upper()
# Validate response
if any(word in signal for word in ['BUY']):
return 'BUY'
elif any(word in signal for word in ['SELL']):
return 'SELL'
elif any(word in signal for word in ['HOLD', 'NEUTRAL', 'WAIT']):
return 'HOLD'
else:
return f"ERROR: Invalid response: {signal}"
except Exception as e:
return f"ERROR: {e.__class__.__name__}: {str(e)}"
async def get_consensus_for_symbol(symbol: str, prompt: str):
"""Get consensus from 33 AI models for a symbol."""
logging.info(f"Getting 33-model consensus for {symbol}...")
tasks = [get_ai_signal(prompt, model) for model in ALL_MODELS]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Tally results
tally = defaultdict(int)
error_details = []
for i, result in enumerate(results):
if isinstance(result, Exception):
tally['ERROR'] += 1
error_details.append(f"Model {ALL_MODELS[i]}: {str(result)}")
elif result in ['BUY', 'SELL', 'HOLD']:
tally[result] += 1
else:
tally['ERROR'] += 1
error_details.append(f"Model {ALL_MODELS[i]}: {result}")
total_votes = len(results)
buy_pct = tally['BUY'] / total_votes
sell_pct = tally['SELL'] / total_votes
hold_pct = tally['HOLD'] / total_votes
# Determine final signal (require >40% confidence for action)
if buy_pct > 0.4 and buy_pct > sell_pct:
final_signal = 'BUY'
confidence = buy_pct
elif sell_pct > 0.4 and sell_pct > buy_pct:
final_signal = 'SELL'
confidence = sell_pct
else:
final_signal = 'HOLD'
confidence = max(buy_pct, sell_pct, hold_pct)
return {
'symbol': symbol,
'timestamp': datetime.datetime.now(pytz.utc),
'final_signal': final_signal,
'confidence': confidence,
'vote_tally': dict(tally),
'total_models': total_votes,
'errors': error_details,
'buy_percentage': buy_pct,
'sell_percentage': sell_pct,
'hold_percentage': hold_pct
}
# --- Trading Execution Logic ---
def execute_trading_decision(symbol: str, consensus_data: dict, current_price: float):
"""Execute trading decisions based on AI consensus."""
signal = consensus_data['final_signal']
confidence = consensus_data['confidence']
# Only trade if confidence is high enough
if confidence < 0.5:
return "NO_TRADE", "Low confidence"
current_positions = TRADING_STATE['positions']
if signal == 'BUY' and symbol not in current_positions:
# Enter long position
TRADING_STATE['positions'][symbol] = {
'entry_price': current_price,
'entry_time': datetime.datetime.now(pytz.utc),
'position_type': 'LONG',
'size': 0.01 # Fixed position size for demo
}
return "ENTER_LONG", f"Entered LONG at {current_price}"
elif signal == 'SELL' and symbol not in current_positions:
# Enter short position
TRADING_STATE['positions'][symbol] = {
'entry_price': current_price,
'entry_time': datetime.datetime.now(pytz.utc),
'position_type': 'SHORT',
'size': 0.01
}
return "ENTER_SHORT", f"Entered SHORT at {current_price}"
elif signal == 'HOLD' and symbol in current_positions:
# Exit position
position = current_positions[symbol]
pnl = calculate_pnl(position, current_price)
del TRADING_STATE['positions'][symbol]
# Record trade
trade_record = {
'symbol': symbol,
'entry_time': position['entry_time'],
'exit_time': datetime.datetime.now(pytz.utc),
'position_type': position['position_type'],
'entry_price': position['entry_price'],
'exit_price': current_price,
'pnl': pnl
}
TRADING_STATE['performance_metrics']['trades'].append(trade_record)
return "EXIT_POSITION", f"Exited {position['position_type']} with PnL: {pnl:.4f}"
return "NO_ACTION", "No trading action taken"
def calculate_pnl(position: dict, current_price: float) -> float:
"""Calculate PnL for a position."""
if position['position_type'] == 'LONG':
return (current_price - position['entry_price']) * position['size']
else: # SHORT
return (position['entry_price'] - current_price) * position['size']
# --- CSV Logging ---
def initialize_csv_log():
"""Initialize CSV file with headers."""
headers = [
'timestamp', 'symbol', 'final_signal', 'confidence',
'buy_votes', 'sell_votes', 'hold_votes', 'error_count',
'buy_percentage', 'sell_percentage', 'hold_percentage',
'action_taken', 'action_reason', 'current_price',
'position_type', 'entry_price', 'realized_pnl'
]
with open(CSV_FILENAME, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(headers)
def log_to_csv(consensus_data: dict, action_data: tuple, current_price: float):
"""Log trading decision to CSV."""
action_taken, action_reason = action_data
# Get position info
position = TRADING_STATE['positions'].get(consensus_data['symbol'], {})
row = [
consensus_data['timestamp'].isoformat(),
consensus_data['symbol'],
consensus_data['final_signal'],
consensus_data['confidence'],
consensus_data['vote_tally'].get('BUY', 0),
consensus_data['vote_tally'].get('SELL', 0),
consensus_data['vote_tally'].get('HOLD', 0),
consensus_data['vote_tally'].get('ERROR', 0),
consensus_data['buy_percentage'],
consensus_data['sell_percentage'],
consensus_data['hold_percentage'],
action_taken,
action_reason,
current_price,
position.get('position_type', 'NONE'),
position.get('entry_price', 0),
TRADING_STATE['performance_metrics'].get('total_pnl', 0)
]
with open(CSV_FILENAME, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow(row)
# --- WebSocket Data Feed ---
async def binance_websocket_listener():
"""Listen to Binance WebSocket for real-time data."""
while True:
try:
async with websockets.connect(WEBSOCKET_URL) as websocket:
logging.info("Connected to Binance WebSocket")
while True:
message = await websocket.recv()
data = json.loads(message)
# Process miniTicker data
for ticker in data:
symbol = ticker['s']
if symbol in TARGET_SYMBOLS:
# Update data structures
new_data = pd.DataFrame({
'Close': [float(ticker['c'])],
'High': [float(ticker['h'])],
'Low': [float(ticker['l'])],
'Volume': [float(ticker['v'])]
}, index=[pd.to_datetime(ticker['E'], unit='ms')])
# Update 1m data
if not ALL_PERPS_DATA[symbol]['1m'].empty:
ALL_PERPS_DATA[symbol]['1m'] = pd.concat([
ALL_PERPS_DATA[symbol]['1m'].iloc[-199:],
new_data
])
else:
ALL_PERPS_DATA[symbol]['1m'] = new_data
except Exception as e:
logging.error(f"WebSocket error: {e}, reconnecting in 5 seconds...")
await asyncio.sleep(5)
# --- Main Trading Loop ---
async def run_trading_engine():
"""Main trading engine that runs consensus analysis and executes trades."""
logging.info("Starting AI Trading Engine with 33-model consensus...")
# Initialize CSV log
initialize_csv_log()
# Create mock data for initial testing
initialize_mock_data()
while True:
try:
start_time = time.time()
# Run consensus analysis for all symbols
consensus_tasks = []
for symbol in TARGET_SYMBOLS:
# Prepare data and create prompt
data_frames = ALL_PERPS_DATA[symbol]
if data_frames['1d'].empty:
continue
df_daily = add_technical_indicators(data_frames['1d'])
df_4h = add_technical_indicators(data_frames['4h'])
df_1m = add_technical_indicators(data_frames['1m'])
if df_daily.empty or df_4h.empty or df_1m.empty:
continue
latest_daily = df_daily.iloc[-1]
latest_4h = df_4h.iloc[-1]
latest_1m = df_1m.iloc[-1]
prompt = format_data_for_gpt(latest_daily, latest_4h, latest_1m, symbol)
consensus_tasks.append(get_consensus_for_symbol(symbol, prompt))
if consensus_tasks:
# Get all consensus results
all_consensus = await asyncio.gather(*consensus_tasks)
# Execute trading decisions
for consensus in all_consensus:
current_price = ALL_PERPS_DATA[consensus['symbol']]['1m']['Close'].iloc[-1] if not ALL_PERPS_DATA[consensus['symbol']]['1m'].empty else 0
action_data = execute_trading_decision(consensus['symbol'], consensus, current_price)
log_to_csv(consensus, action_data, current_price)
# Log results
logging.info(f"{consensus['symbol']}: {consensus['final_signal']} "
f"(Conf: {consensus['confidence']:.2f}) "
f"Votes: B{consensus['vote_tally'].get('BUY', 0)}/"
f"S{consensus['vote_tally'].get('SELL', 0)}/"
f"H{consensus['vote_tally'].get('HOLD', 0)}/"
f"E{consensus['vote_tally'].get('ERROR', 0)} "
f"Action: {action_data[0]}")
# Calculate sleep time to maintain frequency
processing_time = time.time() - start_time
sleep_time = max(0, ANALYSIS_FREQUENCY_SECONDS - processing_time)
await asyncio.sleep(sleep_time)
except Exception as e:
logging.error(f"Trading engine error: {e}")
await asyncio.sleep(5)
def initialize_mock_data():
"""Initialize with mock data for demonstration."""
now = datetime.datetime.now(pytz.utc)
for symbol in TARGET_SYMBOLS:
base_price = 30000 if symbol == 'BTCUSDT' else 2000
# Create realistic mock data
mock_1m = pd.DataFrame({
'Close': [base_price + i * 0.1 + (i % 10 - 5) for i in range(200)],
'High': [base_price + i * 0.1 + 2 for i in range(200)],
'Low': [base_price + i * 0.1 - 2 for i in range(200)],
'Volume': [1000 + i * 10 for i in range(200)]
}, index=pd.date_range(end=now, periods=200, freq='1min'))
ALL_PERPS_DATA[symbol]['1m'] = mock_1m
ALL_PERPS_DATA[symbol]['4h'] = mock_1m.iloc[::240] # Sample every 4h
ALL_PERPS_DATA[symbol]['1d'] = mock_1m.iloc[::1440] # Sample daily
# --- HTML Interface for Hugging Face ---
HTML_INTERFACE = """
<!DOCTYPE html>
<html>
<head>
<title>33-Model AI Trading System</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; }
.container { max-width: 1200px; margin: 0 auto; }
.header { background: #2c3e50; color: white; padding: 20px; border-radius: 8px; }
.dashboard { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; margin: 20px 0; }
.card { background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.signal-buy { color: #27ae60; font-weight: bold; }
.signal-sell { color: #e74c3c; font-weight: bold; }
.signal-hold { color: #f39c12; font-weight: bold; }
table { width: 100%; border-collapse: collapse; }
th, td { padding: 8px 12px; text-align: left; border-bottom: 1px solid #ddd; }
th { background: #f8f9fa; }
.progress-bar { background: #ecf0f1; border-radius: 4px; height: 20px; }
.progress-fill { height: 100%; border-radius: 4px; }
.buy-fill { background: #27ae60; }
.sell-fill { background: #e74c3c; }
.hold-fill { background: #f39c12; }
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>33-Model AI Trading System</h1>
<p>Real-time trading signals from 33 different AI models</p>
</div>
<div class="dashboard">
<div class="card">
<h2>Current Signals</h2>
<div id="current-signals">
<p>Loading signals...</p>
</div>
</div>
<div class="card">
<h2>Performance Metrics</h2>
<div id="performance-metrics">
<p>Loading metrics...</p>
</div>
</div>
</div>
<div class="card">
<h2>Recent Trading Activity</h2>
<div id="trading-activity">
<p>Loading activity...</p>
</div>
</div>
<div class="card">
<h2>Model Consensus Details</h2>
<div id="consensus-details">
<p>Loading consensus data...</p>
</div>
</div>
</div>
<script>
function updateDashboard() {
fetch('/api/status')
.then(response => response.json())
.then(data => {
// Update current signals
let signalsHtml = '<table><tr><th>Symbol</th><th>Signal</th><th>Confidence</th><th>Vote Distribution</th><th>Action</th></tr>';
data.signals.forEach(signal => {
signalsHtml += `
<tr>
<td>${signal.symbol}</td>
<td class="signal-${signal.final_signal.toLowerCase()}">${signal.final_signal}</td>
<td>${(signal.confidence * 100).toFixed(1)}%</td>
<td>
<div class="progress-bar">
<div class="progress-fill buy-fill" style="width: ${signal.buy_percentage * 100}%"></div>
<div class="progress-fill sell-fill" style="width: ${signal.sell_percentage * 100}%"></div>
<div class="progress-fill hold-fill" style="width: ${signal.hold_percentage * 100}%"></div>
</div>
B:${Math.round(signal.buy_percentage * 33)} | S:${Math.round(signal.sell_percentage * 33)} | H:${Math.round(signal.hold_percentage * 33)}
</td>
<td>${signal.action_taken || 'NONE'}</td>
</tr>
`;
});
signalsHtml += '</table>';
document.getElementById('current-signals').innerHTML = signalsHtml;
// Update performance metrics
let metricsHtml = `
<p>Total Trades: ${data.metrics.total_trades}</p>
<p>Active Positions: ${data.metrics.active_positions}</p>
<p>Total PnL: ${data.metrics.total_pnl.toFixed(4)}</p>
<p>Win Rate: ${data.metrics.win_rate}%</p>
`;
document.getElementById('performance-metrics').innerHTML = metricsHtml;
// Update trading activity
let activityHtml = '<table><tr><th>Time</th><th>Symbol</th><th>Action</th><th>Price</th><th>PnL</th></tr>';
data.recent_trades.forEach(trade => {
activityHtml += `
<tr>
<td>${new Date(trade.timestamp).toLocaleTimeString()}</td>
<td>${trade.symbol}</td>
<td>${trade.action}</td>
<td>${trade.price.toFixed(2)}</td>
<td>${trade.pnl ? trade.pnl.toFixed(4) : 'N/A'}</td>
</tr>
`;
});
activityHtml += '</table>';
document.getElementById('trading-activity').innerHTML = activityHtml;
})
.catch(error => {
console.error('Error fetching data:', error);
});
}
// Update every 5 seconds
setInterval(updateDashboard, 5000);
updateDashboard();
</script>
</body>
</html>
"""
# --- Flask App for Hugging Face Spaces ---
from flask import Flask, jsonify, request, render_template_string
app = Flask(__name__)
@app.route('/')
def home():
return render_template_string(HTML_INTERFACE)
@app.route('/api/status')
def api_status():
"""API endpoint for dashboard data."""
# Calculate performance metrics
total_trades = len(TRADING_STATE['performance_metrics'].get('trades', []))
active_positions = len(TRADING_STATE['positions'])
total_pnl = sum(trade['pnl'] for trade in TRADING_STATE['performance_metrics'].get('trades', []))
win_rate = len([t for t in TRADING_STATE['performance_metrics'].get('trades', []) if t['pnl'] > 0]) / max(total_trades, 1) * 100
# Get recent signals (last 5)
recent_signals = TRADING_STATE['signals_history'][-5:] if TRADING_STATE['signals_history'] else []
return jsonify({
'signals': recent_signals,
'metrics': {
'total_trades': total_trades,
'active_positions': active_positions,
'total_pnl': total_pnl,
'win_rate': round(win_rate, 1)
},
'recent_trades': TRADING_STATE['performance_metrics'].get('trades', [])[-10:]
})
@app.route('/api/consensus/<symbol>')
def api_consensus(symbol):
"""API endpoint for specific symbol consensus."""
if symbol.upper() not in TARGET_SYMBOLS:
return jsonify({'error': 'Symbol not found'}), 404
# Return latest consensus for symbol
symbol_signals = [s for s in TRADING_STATE['signals_history'] if s['symbol'] == symbol.upper()]
latest_signal = symbol_signals[-1] if symbol_signals else {}
return jsonify(latest_signal)
# --- Main Application Startup ---
async def main():
"""Start all services."""
logging.info("Starting 33-Model AI Trading System...")
# Initialize CSV logging
initialize_csv_log()
# Start WebSocket listener in background
websocket_task = asyncio.create_task(binance_websocket_listener())
# Start trading engine
trading_task = asyncio.create_task(run_trading_engine())
# Wait for both tasks (they should run indefinitely)
await asyncio.gather(websocket_task, trading_task)
if __name__ == "__main__":
# For Hugging Face Spaces, we need to run the Flask app
# In production, you would run this differently
import threading
# Start the async tasks in a separate thread
def run_async_tasks():
asyncio.run(main())
async_thread = threading.Thread(target=run_async_tasks, daemon=True)
async_thread.start()
# Run Flask app
app.run(host="0.0.0.0", port=7860, debug=False)