Realtradingbot / App.py
sa1646's picture
Rename Guardian. Py to App.py
6efa55a verified
import random
from deap import base, creator, tools
class StrategyMesh:
def __init__(self, n_agents=100):
self.n_agents = n_agents
self.agents = self._init_agents()
def _init_agents(self):
agents = []
for _ in range(self.n_agents):
agent = {
"features": random.sample(["rsi", "volume", "funding", "volatility"], k=2),
"params": {
"rsi_threshold": random.uniform(30, 70),
"volume_mult": random.uniform(1.2, 3.0)
},
"fitness": 0.0
}
agents.append(agent)
return agents
def evolve(self, performance_data):
sorted_agents = sorted(self.agents, key=lambda x: x["fitness"], reverse=True)
top_20 = sorted_agents[:int(0.2 * len(sorted_agents))]
new_agents = [self._mutate(a) for a in top_20]
for _ in range(int(0.3 * self.n_agents)):
new_agents.append(self._random_agent())
self.agents = top_20 + new_agents
def _mutate(self, agent):
new_agent = agent.copy()
if "rsi_threshold" in new_agent["params"]:
new_agent["params"]["rsi_threshold"] = max(20, min(80, new_agent["params"]["rsi_threshold"] + random.uniform(-5, 5)))
return new_agent
def _random_agent(self):
return {
"features": random.sample(["rsi", "volume", "funding", "volatility"], k=2),
"params": {
"rsi_threshold": random.uniform(30, 70),
"volume_mult": random.uniform(1.2, 3.0)
},
"fitness": 0.0
}
class RiskController:
def __init__(self, config):
self.config = config["risk"]
self.daily_loss = 0.0
def should_hedge(self, volatility):
return self.config["hedge_on_volatility"] and volatility > 0.02
def can_trade(self, account_balance, current_drawdown):
if current_drawdown > self.config["max_daily_loss"]:
return False, "MAX DAILY LOSS HIT"
return True, "OK"
class Vault:
def __init__(self, config):
self.reserve = 0.0
self.ratio = config["vault"]["profit_reserve_ratio"]
self.threshold = config["vault"]["drawdown_threshold"]
self.inject_ratio = config["vault"]["inject_ratio"]
def add_profit(self, profit):
reserve_add = profit * self.ratio
self.reserve += reserve_add
return reserve_add
def inject_on_drawdown(self, drawdown):
if drawdown > self.threshold and self.reserve > 0:
inject = self.reserve * self.inject_ratio
self.reserve -= inject
return inject
return 0.0
import json
import random
import ccxt
class MarketFeed:
def __init__(self, config_path="config/settings.json"):
with open(config_path) as f:
self.config = json.load(f)
self.mode = self.config["mode"]
if self.mode == "live":
self.exchange = ccxt.binance({
'apiKey': self.config["api"]["key"],
'secret': self.config["api"]["secret"],
'enableRateLimit': True,
})
def get_ticker(self, symbol="BTC/USDT"):
if self.mode == "paper":
base = 60000
return {
"symbol": symbol,
"price": round(base + random.uniform(-100, 100), 2),
"volume": random.uniform(50, 200)
}
else:
ticker = self.exchange.fetch_ticker(symbol)
return {
"symbol": symbol,
"price": ticker["last"],
"volume": ticker["baseVolume"]
}
class PaperTrader:
def __init__(self):
self.balance = 1000.0
self.trades = []
def execute_trade(self, symbol, side, price):
amount = 5.0 / price # $5 trade
cost = amount * price
if side == "buy":
self.balance -= cost
self.trades.append({"symbol": symbol, "side": "buy", "price": price, "amount": amount})
else:
self.balance += cost
self.trades.append({"symbol": symbol, "side": "sell", "price": price, "amount": amount})
print(f"📝 PAPER TRADE: {side} {amount:.4f} {symbol} at ${price}")
return {"status": "filled"}
import ccxt
import json
class LiveTrader:
def __init__(self, config_path="config/settings.json"):
with open(config_path) as f:
config = json.load(f)
self.exchange = ccxt.binance({
'apiKey': config["api"]["key"],
'secret': config["api"]["secret"],
})
self.risk = config["risk"]
def execute_trade(self, symbol, side, price):
amount = self.risk["position_size_usd"] / price if "position_size_usd" in self.risk else 5.0 / price
try:
if side == "buy":
order = self.exchange.create_limit_buy_order(symbol, amount, price)
else:
order = self.exchange.create_limit_sell_order(symbol, amount, price)
print(f"✅ LIVE TRADE: {side} {amount:.4f} {symbol} at ${price}")
return order
except Exception as e:
print(f"❌ Trade failed: {str(e)}")
return None
import requests
import json
class TelegramBot:
def __init__(self, config_path="config/settings.json"):
with open(config_path) as f:
config = json.load(f)
self.enabled = config["telegram"]["enabled"]
self.token = config["telegram"]["bot_token"]
self.chat_id = config["telegram"]["chat_id"]
def send_alert(self, message):
if not self.enabled:
return
url = f"https://api.telegram.org/bot{self.token}/sendMessage"
payload = {
"chat_id": self.chat_id,
"text": f"🦅 Pegasus Ultra\n{message}",
"parse_mode": "Markdown"
}
try:
requests.post(url, json=payload)
except:
pass
import gradio as gr
import threading
import json
from data.market_feed import MarketFeed
from execution.paper_trader import PaperTrader
from execution.live_trader import LiveTrader
from core.vault import Vault
class PegasusEngine:
def __init__(self):
with open("config/settings.json") as f:
self.config = json.load(f)
self.mode = self.config["mode"]
self.feed = MarketFeed()
self.vault = Vault(self.config)
self.trades = []
if self.mode == "live":
self.trader = LiveTrader()
else:
self.trader = PaperTrader()
def run(self):
for i in range(100): # Replace with while True for 24/7
tick = self.feed.get_ticker()
self.trader.execute_trade("BTC/USDT", "buy", tick["price"])
self.trades.append(tick)
engine = None
def start_engine():
global engine
if engine is None:
engine = PegasusEngine()
threading.Thread(target=engine.run, daemon=True).start()
return "✅ Engine started in " + engine.mode + " mode!"
def get_status():
if engine:
return f"Mode: {engine.mode}\nTrades: {len(engine.trades)}\nVault: ${engine.vault.reserve:.2f}"
return "Engine not running"
with gr.Blocks() as demo:
gr.Markdown("# 🦅 Pegasus Ultra – Advanced Trading")
with gr.Row():
start_btn = gr.Button("Start Engine")
status = gr.Textbox(label="Status", interactive=False)
start_btn.click(start_engine, outputs=status)
demo.load(get_status, every=5, outputs=status)
if __name__ == "__main__":
demo.launch(server_port=7860)
import subprocess
import time
import sys
import os
def run_main():
while True:
try:
result = subprocess.run([sys.executable, "monitor/web_dashboard.py"], cwd=os.getcwd())
if result.returncode != 0:
print("⚠️ Dashboard exited. Restarting in 10s...")
except KeyboardInterrupt:
break
except Exception as e:
print(f"⚠️ Crash detected: {e}. Restarting in 10s...")
time.sleep(10)
if __name__ == "__main__":
print("🛡️ Pegasus Guardian: Auto-recovery enabled")
run_main()