samsonleegh's picture
Update app.py
859578b verified
import os
import asyncio
import importlib
import json
import tempfile
import traceback
from datetime import datetime, timedelta, timezone
import gradio as gr
import pandas as pd
from dotenv import load_dotenv
import hype_accounts_server # your trading MCP
from agents import Agent, Runner, trace, Tool
from agents.mcp import MCPServerStdio
from memory_utils import load_memories, save_memory, load_memories_df
# ====================== CONFIG ======================
load_dotenv(override=True)
MODEL = os.getenv("OPENAI_MODEL", "gpt-5-mini") # fallback if not set
RUN_INTERVAL_MINS = int(os.getenv("RUN_INTERVAL_MINS", "60"))
HEARTBEAT_SECS = int(os.getenv("HEARTBEAT_SECS", "5"))
MAX_RUN_SECS = int(os.getenv("MAX_RUN_SECS", "500"))
CSV_PATH = os.path.join(tempfile.gettempdir(), "memories_latest.csv")
# If you use server2, change this to 'hl_indicators_server2.py'
TA_SERVER_SCRIPT = os.getenv("TA_SERVER_SCRIPT", "hl_indicators_server.py")
NEWS_SERVER_SCRIPT = os.getenv("NEWS_SERVER_SCRIPT", "crypto_news_scraper_server.py")
# ====================== MCP SERVER FACTORIES ======================
def make_hyperliquid_trader_mcp_servers():
return [
MCPServerStdio(
{
"command": "python3",
"args": ["-u", "hype_accounts_server.py"],
"env": {
"HYPERLIQUID_API_KEY": os.getenv("HYPERLIQUID_API_KEY"),
"HYPERLIQUID_PRIVATE_KEY": os.getenv("HYPERLIQUID_PRIVATE_KEY"),
"HYPERLIQUID_ACCOUNT_ADDRESS": os.getenv("HYPERLIQUID_ACCOUNT_ADDRESS"),
},
},
client_session_timeout_seconds=60,
)
]
def make_crypto_news_mcp_servers():
return [
MCPServerStdio(
{"command": "python3", "args": ["-u", NEWS_SERVER_SCRIPT]},
client_session_timeout_seconds=60,
)
]
def make_technical_analyst_mcp_servers():
return [
MCPServerStdio(
{"command": "python3", "args": ["-u", TA_SERVER_SCRIPT]},
client_session_timeout_seconds=60,
)
]
# ====================== UTILITIES ======================
async def connect_all(servers):
for s in servers:
await s.connect()
async def close_all(servers):
for s in servers:
try:
await s.close()
except Exception:
pass
def now() -> datetime:
"""Return current UTC time (aware)."""
return datetime.now(timezone.utc)
def next_run_dt(interval_mins: int = RUN_INTERVAL_MINS) -> datetime:
"""
Return the next run time, aligned to interval_mins in UTC.
Example: if now=10:07 UTC and interval=15 β†’ 10:15 UTC.
"""
t = now().replace(microsecond=0)
n = int(interval_mins)
if n <= 0:
raise ValueError("interval_mins must be > 0")
step = n * 60
ts = int(t.timestamp())
next_ts = ts - (ts % step) + step
return datetime.fromtimestamp(next_ts, tz=timezone.utc)
def eta_str(target: datetime) -> str:
"""Return time difference between now() and target as 'MM:SS' (UTC)."""
secs = max(0, int((target - now()).total_seconds()))
m, s = divmod(secs, 60)
return f"{m:02d}:{s:02d}"
def write_mem_csv(df):
if df is None or df.empty:
df = pd.DataFrame(columns=["ts", "summary"])
df.to_csv(CSV_PATH, index=False)
return CSV_PATH
# ====================== RESEARCHER AGENTS ======================
async def get_crypto_news_researcher(mcp_servers) -> Agent:
instructions = (
"You are a cryptocurrency researcher. You can call MCP tools to gather news and surface "
"potential opportunities. If the user (or prompt) specifies coins (e.g., HYPE, BTC, ETH, XRP), "
"focus on those; otherwise scan broadly. Be concise and actionable.\n"
f"Time (UTC): {now().strftime('%Y-%m-%d %H:%M:%S')}"
)
return Agent(
name="Crypto news researcher",
instructions=instructions,
model=MODEL,
mcp_servers=mcp_servers,
)
async def get_crypto_news_researcher_tool(mcp_servers) -> Tool:
researcher = await get_crypto_news_researcher(mcp_servers)
return researcher.as_tool(
tool_name="crypto_news_researcher",
tool_description="Research crypto news and summarize opportunities (broad or coin-specific).",
)
async def get_technical_analyst_researcher(mcp_servers) -> Agent:
# Generic; LLM decides which TA tools to call and in what order.
instructions = (
"Role: Cryptocurrency technical researcher using MCP indicator tools.\n"
"Goals:\n"
"- Form a directional bias (long/short/neutral) for each requested coin/timeframe.\n"
"- Summarize key trend/momentum/volatility/volume signals with latest values.\n"
"- Optionally outline a trade *idea skeleton*, without enforcing a fixed strategy.\n"
"Reasoning Hints (non-prescriptive):\n"
"- Trend (MA/EMA alignment, slope); Momentum (RSI/MACD); Volatility (ATR, squeezes); Volume/Flow (OBV/ADL/VWAP context); Strength (ADX).\n"
"- Call out divergences, regime shifts, chop (low ADX), or news-led anomalies.\n"
"- Keep caveats explicit (thin liquidity, event risk, conflicting signals).\n"
"Data Discipline:\n"
"- Use only completed candles (exclude_current_bar=True).\n"
"- Default params unless caller overrides:\n"
" interval='1h', lookback_period=6, limit=600, include_signals=True, exclude_current_bar=True, return_last_only=False.\n"
"- When calling tools, pass only the coin symbol as `name` (eg. BTC, ETH, SOL, XRP, SUI, etc.). No exchange prefixes/suffixes or quote assets.\n"
"\n"
"Output (per coin)\n"
"1) One-line bias + confidence.\n"
"2) Bullet signals with latest values and reasoning.\n"
"3) Optional trade idea.\n"
"4) Caveats. \n\n"
"\n"
f"Time (UTC): {now().strftime('%Y-%m-%d %H:%M:%S')}"
)
return Agent(
name="Crypto technical researcher",
instructions=instructions,
model=MODEL,
mcp_servers=mcp_servers,
)
async def get_technical_analyst_researcher_tool(mcp_servers) -> Tool:
researcher = await get_technical_analyst_researcher(mcp_servers)
return researcher.as_tool(
tool_name="crypto_technical_researcher",
tool_description="Analyze TA indicators and propose trade ideas (coin/interval/lookback optional).",
)
# ====================== TRADER CORE ======================
async def run_trader():
hyper_servers = []
news_servers = []
ta_servers = []
try:
print(f"\nπŸ•’ Starting trading cycle at UTC {now().strftime('%Y-%m-%d %H:%M:%S')}")
# Memory (optional context)
past_memories = load_memories(1)
memory_text = "\n".join(past_memories) if past_memories else "No prior memories."
# Account (assumed async in your hype_accounts_server)
account_details = await hype_accounts_server.get_account_details()
pretty_details = json.dumps(account_details, indent=2) if isinstance(account_details, (dict, list)) else str(account_details)
trader_instructions = (
"You are a cryptocurrency perpetuals trader.\n"
f"Past insights:\n{memory_text}\n\n"
f"Current holdings/balance:\n{pretty_details}\n\n"
"You have tools for news, hourly technical indicators, holdings/balance, and Hyperliquid execution.\n"
"When scanning for crypto news, scan broadly for any oppotunities."
"When calling technical indicator tools, pass only the coin symbol as `name` (eg. BTC, ETH, SOL, XRP, SUI, HYPE, ZEC, etc.) Do NOT include exchange prefixes/suffixes or quote assets (no BINANCE:, no /USDT, no -PERP, no .PERP).\n"
"Carry out trades as you see fit β€” take profits and cut losses as needed."
"Do not wait for instructions or ask for confirmation."
"Each transaction costs 0.04%.\n"
"If signals are unclear, do not trade."
)
# High-level prompt to drive the agent plan
prompt = (
"Step 1 β€” Check holdings & balance.\n"
"Step 2 β€” Scan crypto news broadly for ANY potential coin trading opportunities. Be open minded. Do not limit to a list.\n"
"Step 3 β€” Select 3–5 symbols based on news & holdings. Analyze 1h indicators and propose strategies.\n"
"Step 4 β€” Execute long/short only if edge exists; else abstain.\n"
f"Now (UTC): {now().strftime('%Y-%m-%d %H:%M')}"
)
# Fresh servers each run
hyper_servers = make_hyperliquid_trader_mcp_servers()
news_servers = make_crypto_news_mcp_servers()
ta_servers = make_technical_analyst_mcp_servers()
await connect_all(hyper_servers)
await connect_all(news_servers)
await connect_all(ta_servers)
# Bind tools to their respective servers
crypto_news_tool = await get_crypto_news_researcher_tool(news_servers)
ta_tool = await get_technical_analyst_researcher_tool(ta_servers)
trader = Agent(
name="crypto_trader",
instructions=trader_instructions,
tools=[crypto_news_tool, ta_tool],
mcp_servers=hyper_servers, # trading actions live here
model=MODEL,
)
async def _run():
with trace("crypto_trader"):
return await Runner.run(trader, prompt, max_turns=30)
# result = await asyncio.wait_for(_run(), timeout=MAX_RUN_SECS)
result = await _run()
save_memory(result.final_output)
mem_df = load_memories_df(10) if past_memories else pd.DataFrame(columns=["ts", "summary"])
output_md = (
f"### βœ… Trade Completed UTC {now().strftime('%Y-%m-%d %H:%M')}\n\n"
f"{result.final_output}\n\n"
f"---\n\n"
f"### πŸ’³ Account Details (Post-Run)\n"
f"```\n{pretty_details}\n```"
)
return output_md, mem_df
except asyncio.TimeoutError:
return "❌ Timed out (300s). Consider raising timeout or reducing tool work per cycle.", pd.DataFrame(columns=["ts", "summary"])
except Exception as e:
err = traceback.format_exc()
return f"❌ Error during trading cycle:\n```\n{err}\n```", pd.DataFrame(columns=["ts", "summary"])
finally:
await close_all(hyper_servers)
await close_all(news_servers)
await close_all(ta_servers)
# ====================== SCHEDULER / UI STATE ======================
LATEST_OUTPUT = "⏳ Waiting for first scheduled run…"
LATEST_MEM_DF = pd.DataFrame(columns=["ts", "summary"])
NEXT_RUN_AT = None
IS_RUNNING = False
LAST_RUN_STARTED_AT = None
LAST_RUN_ENDED_AT = None
STOP_TRADING = False
_STATE_LOCK = asyncio.Lock()
_SCHEDULER_STARTED = False
_RUN_LOCK = asyncio.Lock()
async def safe_run_trader(force: bool = False):
global LATEST_OUTPUT, LATEST_MEM_DF, IS_RUNNING, LAST_RUN_STARTED_AT, LAST_RUN_ENDED_AT
if _RUN_LOCK.locked() or (STOP_TRADING and not force):
return
async with _RUN_LOCK:
IS_RUNNING = True
LAST_RUN_STARTED_AT = now()
try:
md, mem_df = await asyncio.wait_for(run_trader(), timeout=MAX_RUN_SECS)
async with _STATE_LOCK:
LATEST_OUTPUT = md or "(no output)"
if isinstance(mem_df, pd.DataFrame) and not mem_df.empty:
LATEST_MEM_DF = mem_df.copy()
else:
stamp = now().strftime("%Y-%m-%d %H:%M:%S")
extra = pd.DataFrame([{"ts": stamp, "summary": "βœ… Run completed"}])
LATEST_MEM_DF = pd.concat([LATEST_MEM_DF, extra], ignore_index=True)
except Exception:
err = traceback.format_exc()
async with _STATE_LOCK:
LATEST_OUTPUT = f"❌ Exception\n```\n{err}\n```"
finally:
LAST_RUN_ENDED_AT = now()
IS_RUNNING = False
# --- Manual "Run Now" trigger (async) ---
async def on_run_now():
# Force a run even if paused; lock prevents overlap
await safe_run_trader(force=True)
# Return the same tuple your timer updates, so the UI refreshes immediately
latest, status, mem_df, csv_path = await ui_poll()
return latest, status, mem_df, csv_path
async def run_scheduler():
global NEXT_RUN_AT
NEXT_RUN_AT = next_run_dt(RUN_INTERVAL_MINS)
# sleep until next aligned tick
await asyncio.sleep(max(0.0, (NEXT_RUN_AT - now()).total_seconds()))
while True:
if not STOP_TRADING:
await safe_run_trader()
NEXT_RUN_AT = next_run_dt(RUN_INTERVAL_MINS)
else:
NEXT_RUN_AT = None
# sleep towards the next run or heartbeat-period while paused
await asyncio.sleep(
HEARTBEAT_SECS if STOP_TRADING else max(0.0, (NEXT_RUN_AT - now()).total_seconds())
)
async def heartbeat():
while True:
state = "⏸️ Paused" if STOP_TRADING else ("🟒 Running" if IS_RUNNING else "🟑 Idle")
print(f"[{now():%H:%M:%S}] πŸ’“ {state}")
await asyncio.sleep(HEARTBEAT_SECS)
async def ensure_scheduler_started():
global _SCHEDULER_STARTED
if not _SCHEDULER_STARTED:
_SCHEDULER_STARTED = True
asyncio.create_task(heartbeat())
asyncio.create_task(run_scheduler())
def status_markdown():
lines = [f"**Status:** {'⏸️ Paused' if STOP_TRADING else ('🟒 Running' if IS_RUNNING else '🟑 Idle')}"]
if LAST_RUN_STARTED_AT:
lines.append(f"**Last start (UTC):** {LAST_RUN_STARTED_AT:%Y-%m-%d %H:%M:%S}")
if LAST_RUN_ENDED_AT:
lines.append(f"**Last end (UTC):** {LAST_RUN_ENDED_AT:%Y-%m-%d %H:%M:%S}")
if NEXT_RUN_AT and not STOP_TRADING:
lines.append(f"**Next run (UTC):** {NEXT_RUN_AT:%Y-%m-%d %H:%M} (ETA {eta_str(NEXT_RUN_AT)})")
if STOP_TRADING:
lines.append("**Scheduler:** waiting for Resume")
return "\n\n".join(lines)
async def ui_poll():
async with _STATE_LOCK:
latest = LATEST_OUTPUT
mem_df = LATEST_MEM_DF.copy()
csv_path = write_mem_csv(mem_df)
return latest, status_markdown(), mem_df, csv_path
def _toggle_stop(flag: bool):
global STOP_TRADING
STOP_TRADING = flag
return status_markdown()
# ====================== GRADIO UI ======================
with gr.Blocks(fill_height=True) as demo:
gr.Markdown("## πŸ“ˆ GPT Crypto Trader β€” Live Board")
gr.Markdown(
"[agent tracer](https://platform.openai.com/logs?api=traces) \n"
"[hyperliquid platform](https://app.hyperliquid.xyz/trade)"
)
with gr.Row():
stop_btn = gr.Button("πŸ›‘ Stop", variant="stop")
resume_btn = gr.Button("▢️ Resume", variant="primary")
run_now_btn = gr.Button("⚑ Run Now", variant="secondary")
latest_md = gr.Markdown(value=LATEST_OUTPUT)
status_md = gr.Markdown(value=status_markdown())
mem_table = gr.Dataframe(value=LATEST_MEM_DF, interactive=False, wrap=True, show_label=False)
mem_csv = gr.File(interactive=False)
stop_btn.click(lambda: _toggle_stop(True), outputs=[status_md])
resume_btn.click(lambda: _toggle_stop(False), outputs=[status_md])
run_now_btn.click(on_run_now, outputs=[latest_md, status_md, mem_table, mem_csv])
demo.load(ensure_scheduler_started)
timer = gr.Timer(5.0, active=True)
timer.tick(ui_poll, outputs=[latest_md, status_md, mem_table, mem_csv])
if __name__ == "__main__":
demo.queue().launch(share=True)