import os import asyncio import importlib import json import tempfile import traceback from datetime import datetime, timedelta, timezone import gradio as gr import pandas as pd from dotenv import load_dotenv import hype_accounts_server # your trading MCP from agents import Agent, Runner, trace, Tool from agents.mcp import MCPServerStdio from memory_utils import load_memories, save_memory, load_memories_df # ====================== CONFIG ====================== load_dotenv(override=True) MODEL = os.getenv("OPENAI_MODEL", "gpt-5-mini") # fallback if not set RUN_INTERVAL_MINS = int(os.getenv("RUN_INTERVAL_MINS", "60")) HEARTBEAT_SECS = int(os.getenv("HEARTBEAT_SECS", "5")) MAX_RUN_SECS = int(os.getenv("MAX_RUN_SECS", "500")) CSV_PATH = os.path.join(tempfile.gettempdir(), "memories_latest.csv") # If you use server2, change this to 'hl_indicators_server2.py' TA_SERVER_SCRIPT = os.getenv("TA_SERVER_SCRIPT", "hl_indicators_server.py") NEWS_SERVER_SCRIPT = os.getenv("NEWS_SERVER_SCRIPT", "crypto_news_scraper_server.py") # ====================== MCP SERVER FACTORIES ====================== def make_hyperliquid_trader_mcp_servers(): return [ MCPServerStdio( { "command": "python3", "args": ["-u", "hype_accounts_server.py"], "env": { "HYPERLIQUID_API_KEY": os.getenv("HYPERLIQUID_API_KEY"), "HYPERLIQUID_PRIVATE_KEY": os.getenv("HYPERLIQUID_PRIVATE_KEY"), "HYPERLIQUID_ACCOUNT_ADDRESS": os.getenv("HYPERLIQUID_ACCOUNT_ADDRESS"), }, }, client_session_timeout_seconds=60, ) ] def make_crypto_news_mcp_servers(): return [ MCPServerStdio( {"command": "python3", "args": ["-u", NEWS_SERVER_SCRIPT]}, client_session_timeout_seconds=60, ) ] def make_technical_analyst_mcp_servers(): return [ MCPServerStdio( {"command": "python3", "args": ["-u", TA_SERVER_SCRIPT]}, client_session_timeout_seconds=60, ) ] # ====================== UTILITIES ====================== async def connect_all(servers): for s in servers: await s.connect() async def close_all(servers): for s in servers: try: await s.close() except Exception: pass def now() -> datetime: """Return current UTC time (aware).""" return datetime.now(timezone.utc) def next_run_dt(interval_mins: int = RUN_INTERVAL_MINS) -> datetime: """ Return the next run time, aligned to interval_mins in UTC. Example: if now=10:07 UTC and interval=15 β†’ 10:15 UTC. """ t = now().replace(microsecond=0) n = int(interval_mins) if n <= 0: raise ValueError("interval_mins must be > 0") step = n * 60 ts = int(t.timestamp()) next_ts = ts - (ts % step) + step return datetime.fromtimestamp(next_ts, tz=timezone.utc) def eta_str(target: datetime) -> str: """Return time difference between now() and target as 'MM:SS' (UTC).""" secs = max(0, int((target - now()).total_seconds())) m, s = divmod(secs, 60) return f"{m:02d}:{s:02d}" def write_mem_csv(df): if df is None or df.empty: df = pd.DataFrame(columns=["ts", "summary"]) df.to_csv(CSV_PATH, index=False) return CSV_PATH # ====================== RESEARCHER AGENTS ====================== async def get_crypto_news_researcher(mcp_servers) -> Agent: instructions = ( "You are a cryptocurrency researcher. You can call MCP tools to gather news and surface " "potential opportunities. If the user (or prompt) specifies coins (e.g., HYPE, BTC, ETH, XRP), " "focus on those; otherwise scan broadly. Be concise and actionable.\n" f"Time (UTC): {now().strftime('%Y-%m-%d %H:%M:%S')}" ) return Agent( name="Crypto news researcher", instructions=instructions, model=MODEL, mcp_servers=mcp_servers, ) async def get_crypto_news_researcher_tool(mcp_servers) -> Tool: researcher = await get_crypto_news_researcher(mcp_servers) return researcher.as_tool( tool_name="crypto_news_researcher", tool_description="Research crypto news and summarize opportunities (broad or coin-specific).", ) async def get_technical_analyst_researcher(mcp_servers) -> Agent: # Generic; LLM decides which TA tools to call and in what order. instructions = ( "Role: Cryptocurrency technical researcher using MCP indicator tools.\n" "Goals:\n" "- Form a directional bias (long/short/neutral) for each requested coin/timeframe.\n" "- Summarize key trend/momentum/volatility/volume signals with latest values.\n" "- Optionally outline a trade *idea skeleton*, without enforcing a fixed strategy.\n" "Reasoning Hints (non-prescriptive):\n" "- Trend (MA/EMA alignment, slope); Momentum (RSI/MACD); Volatility (ATR, squeezes); Volume/Flow (OBV/ADL/VWAP context); Strength (ADX).\n" "- Call out divergences, regime shifts, chop (low ADX), or news-led anomalies.\n" "- Keep caveats explicit (thin liquidity, event risk, conflicting signals).\n" "Data Discipline:\n" "- Use only completed candles (exclude_current_bar=True).\n" "- Default params unless caller overrides:\n" " interval='1h', lookback_period=6, limit=600, include_signals=True, exclude_current_bar=True, return_last_only=False.\n" "- When calling tools, pass only the coin symbol as `name` (eg. BTC, ETH, SOL, XRP, SUI, etc.). No exchange prefixes/suffixes or quote assets.\n" "\n" "Output (per coin)\n" "1) One-line bias + confidence.\n" "2) Bullet signals with latest values and reasoning.\n" "3) Optional trade idea.\n" "4) Caveats. \n\n" "\n" f"Time (UTC): {now().strftime('%Y-%m-%d %H:%M:%S')}" ) return Agent( name="Crypto technical researcher", instructions=instructions, model=MODEL, mcp_servers=mcp_servers, ) async def get_technical_analyst_researcher_tool(mcp_servers) -> Tool: researcher = await get_technical_analyst_researcher(mcp_servers) return researcher.as_tool( tool_name="crypto_technical_researcher", tool_description="Analyze TA indicators and propose trade ideas (coin/interval/lookback optional).", ) # ====================== TRADER CORE ====================== async def run_trader(): hyper_servers = [] news_servers = [] ta_servers = [] try: print(f"\nπŸ•’ Starting trading cycle at UTC {now().strftime('%Y-%m-%d %H:%M:%S')}") # Memory (optional context) past_memories = load_memories(1) memory_text = "\n".join(past_memories) if past_memories else "No prior memories." # Account (assumed async in your hype_accounts_server) account_details = await hype_accounts_server.get_account_details() pretty_details = json.dumps(account_details, indent=2) if isinstance(account_details, (dict, list)) else str(account_details) trader_instructions = ( "You are a cryptocurrency perpetuals trader.\n" f"Past insights:\n{memory_text}\n\n" f"Current holdings/balance:\n{pretty_details}\n\n" "You have tools for news, hourly technical indicators, holdings/balance, and Hyperliquid execution.\n" "When scanning for crypto news, scan broadly for any oppotunities." "When calling technical indicator tools, pass only the coin symbol as `name` (eg. BTC, ETH, SOL, XRP, SUI, HYPE, ZEC, etc.) Do NOT include exchange prefixes/suffixes or quote assets (no BINANCE:, no /USDT, no -PERP, no .PERP).\n" "Carry out trades as you see fit β€” take profits and cut losses as needed." "Do not wait for instructions or ask for confirmation." "Each transaction costs 0.04%.\n" "If signals are unclear, do not trade." ) # High-level prompt to drive the agent plan prompt = ( "Step 1 β€” Check holdings & balance.\n" "Step 2 β€” Scan crypto news broadly for ANY potential coin trading opportunities. Be open minded. Do not limit to a list.\n" "Step 3 β€” Select 3–5 symbols based on news & holdings. Analyze 1h indicators and propose strategies.\n" "Step 4 β€” Execute long/short only if edge exists; else abstain.\n" f"Now (UTC): {now().strftime('%Y-%m-%d %H:%M')}" ) # Fresh servers each run hyper_servers = make_hyperliquid_trader_mcp_servers() news_servers = make_crypto_news_mcp_servers() ta_servers = make_technical_analyst_mcp_servers() await connect_all(hyper_servers) await connect_all(news_servers) await connect_all(ta_servers) # Bind tools to their respective servers crypto_news_tool = await get_crypto_news_researcher_tool(news_servers) ta_tool = await get_technical_analyst_researcher_tool(ta_servers) trader = Agent( name="crypto_trader", instructions=trader_instructions, tools=[crypto_news_tool, ta_tool], mcp_servers=hyper_servers, # trading actions live here model=MODEL, ) async def _run(): with trace("crypto_trader"): return await Runner.run(trader, prompt, max_turns=30) # result = await asyncio.wait_for(_run(), timeout=MAX_RUN_SECS) result = await _run() save_memory(result.final_output) mem_df = load_memories_df(10) if past_memories else pd.DataFrame(columns=["ts", "summary"]) output_md = ( f"### βœ… Trade Completed UTC {now().strftime('%Y-%m-%d %H:%M')}\n\n" f"{result.final_output}\n\n" f"---\n\n" f"### πŸ’³ Account Details (Post-Run)\n" f"```\n{pretty_details}\n```" ) return output_md, mem_df except asyncio.TimeoutError: return "❌ Timed out (300s). Consider raising timeout or reducing tool work per cycle.", pd.DataFrame(columns=["ts", "summary"]) except Exception as e: err = traceback.format_exc() return f"❌ Error during trading cycle:\n```\n{err}\n```", pd.DataFrame(columns=["ts", "summary"]) finally: await close_all(hyper_servers) await close_all(news_servers) await close_all(ta_servers) # ====================== SCHEDULER / UI STATE ====================== LATEST_OUTPUT = "⏳ Waiting for first scheduled run…" LATEST_MEM_DF = pd.DataFrame(columns=["ts", "summary"]) NEXT_RUN_AT = None IS_RUNNING = False LAST_RUN_STARTED_AT = None LAST_RUN_ENDED_AT = None STOP_TRADING = False _STATE_LOCK = asyncio.Lock() _SCHEDULER_STARTED = False _RUN_LOCK = asyncio.Lock() async def safe_run_trader(force: bool = False): global LATEST_OUTPUT, LATEST_MEM_DF, IS_RUNNING, LAST_RUN_STARTED_AT, LAST_RUN_ENDED_AT if _RUN_LOCK.locked() or (STOP_TRADING and not force): return async with _RUN_LOCK: IS_RUNNING = True LAST_RUN_STARTED_AT = now() try: md, mem_df = await asyncio.wait_for(run_trader(), timeout=MAX_RUN_SECS) async with _STATE_LOCK: LATEST_OUTPUT = md or "(no output)" if isinstance(mem_df, pd.DataFrame) and not mem_df.empty: LATEST_MEM_DF = mem_df.copy() else: stamp = now().strftime("%Y-%m-%d %H:%M:%S") extra = pd.DataFrame([{"ts": stamp, "summary": "βœ… Run completed"}]) LATEST_MEM_DF = pd.concat([LATEST_MEM_DF, extra], ignore_index=True) except Exception: err = traceback.format_exc() async with _STATE_LOCK: LATEST_OUTPUT = f"❌ Exception\n```\n{err}\n```" finally: LAST_RUN_ENDED_AT = now() IS_RUNNING = False # --- Manual "Run Now" trigger (async) --- async def on_run_now(): # Force a run even if paused; lock prevents overlap await safe_run_trader(force=True) # Return the same tuple your timer updates, so the UI refreshes immediately latest, status, mem_df, csv_path = await ui_poll() return latest, status, mem_df, csv_path async def run_scheduler(): global NEXT_RUN_AT NEXT_RUN_AT = next_run_dt(RUN_INTERVAL_MINS) # sleep until next aligned tick await asyncio.sleep(max(0.0, (NEXT_RUN_AT - now()).total_seconds())) while True: if not STOP_TRADING: await safe_run_trader() NEXT_RUN_AT = next_run_dt(RUN_INTERVAL_MINS) else: NEXT_RUN_AT = None # sleep towards the next run or heartbeat-period while paused await asyncio.sleep( HEARTBEAT_SECS if STOP_TRADING else max(0.0, (NEXT_RUN_AT - now()).total_seconds()) ) async def heartbeat(): while True: state = "⏸️ Paused" if STOP_TRADING else ("🟒 Running" if IS_RUNNING else "🟑 Idle") print(f"[{now():%H:%M:%S}] πŸ’“ {state}") await asyncio.sleep(HEARTBEAT_SECS) async def ensure_scheduler_started(): global _SCHEDULER_STARTED if not _SCHEDULER_STARTED: _SCHEDULER_STARTED = True asyncio.create_task(heartbeat()) asyncio.create_task(run_scheduler()) def status_markdown(): lines = [f"**Status:** {'⏸️ Paused' if STOP_TRADING else ('🟒 Running' if IS_RUNNING else '🟑 Idle')}"] if LAST_RUN_STARTED_AT: lines.append(f"**Last start (UTC):** {LAST_RUN_STARTED_AT:%Y-%m-%d %H:%M:%S}") if LAST_RUN_ENDED_AT: lines.append(f"**Last end (UTC):** {LAST_RUN_ENDED_AT:%Y-%m-%d %H:%M:%S}") if NEXT_RUN_AT and not STOP_TRADING: lines.append(f"**Next run (UTC):** {NEXT_RUN_AT:%Y-%m-%d %H:%M} (ETA {eta_str(NEXT_RUN_AT)})") if STOP_TRADING: lines.append("**Scheduler:** waiting for Resume") return "\n\n".join(lines) async def ui_poll(): async with _STATE_LOCK: latest = LATEST_OUTPUT mem_df = LATEST_MEM_DF.copy() csv_path = write_mem_csv(mem_df) return latest, status_markdown(), mem_df, csv_path def _toggle_stop(flag: bool): global STOP_TRADING STOP_TRADING = flag return status_markdown() # ====================== GRADIO UI ====================== with gr.Blocks(fill_height=True) as demo: gr.Markdown("## πŸ“ˆ GPT Crypto Trader β€” Live Board") gr.Markdown( "[agent tracer](https://platform.openai.com/logs?api=traces) \n" "[hyperliquid platform](https://app.hyperliquid.xyz/trade)" ) with gr.Row(): stop_btn = gr.Button("πŸ›‘ Stop", variant="stop") resume_btn = gr.Button("▢️ Resume", variant="primary") run_now_btn = gr.Button("⚑ Run Now", variant="secondary") latest_md = gr.Markdown(value=LATEST_OUTPUT) status_md = gr.Markdown(value=status_markdown()) mem_table = gr.Dataframe(value=LATEST_MEM_DF, interactive=False, wrap=True, show_label=False) mem_csv = gr.File(interactive=False) stop_btn.click(lambda: _toggle_stop(True), outputs=[status_md]) resume_btn.click(lambda: _toggle_stop(False), outputs=[status_md]) run_now_btn.click(on_run_now, outputs=[latest_md, status_md, mem_table, mem_csv]) demo.load(ensure_scheduler_started) timer = gr.Timer(5.0, active=True) timer.tick(ui_poll, outputs=[latest_md, status_md, mem_table, mem_csv]) if __name__ == "__main__": demo.queue().launch(share=True)