Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| import importlib | |
| import json | |
| import tempfile | |
| import traceback | |
| from datetime import datetime, timedelta, timezone | |
| import gradio as gr | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| import hype_accounts_server # your trading MCP | |
| from agents import Agent, Runner, trace, Tool | |
| from agents.mcp import MCPServerStdio | |
| from memory_utils import load_memories, save_memory, load_memories_df | |
| # ====================== CONFIG ====================== | |
| load_dotenv(override=True) | |
| MODEL = os.getenv("OPENAI_MODEL", "gpt-5-mini") # fallback if not set | |
| RUN_INTERVAL_MINS = int(os.getenv("RUN_INTERVAL_MINS", "60")) | |
| HEARTBEAT_SECS = int(os.getenv("HEARTBEAT_SECS", "5")) | |
| MAX_RUN_SECS = int(os.getenv("MAX_RUN_SECS", "500")) | |
| CSV_PATH = os.path.join(tempfile.gettempdir(), "memories_latest.csv") | |
| # If you use server2, change this to 'hl_indicators_server2.py' | |
| TA_SERVER_SCRIPT = os.getenv("TA_SERVER_SCRIPT", "hl_indicators_server.py") | |
| NEWS_SERVER_SCRIPT = os.getenv("NEWS_SERVER_SCRIPT", "crypto_news_scraper_server.py") | |
| # ====================== MCP SERVER FACTORIES ====================== | |
| def make_hyperliquid_trader_mcp_servers(): | |
| return [ | |
| MCPServerStdio( | |
| { | |
| "command": "python3", | |
| "args": ["-u", "hype_accounts_server.py"], | |
| "env": { | |
| "HYPERLIQUID_API_KEY": os.getenv("HYPERLIQUID_API_KEY"), | |
| "HYPERLIQUID_PRIVATE_KEY": os.getenv("HYPERLIQUID_PRIVATE_KEY"), | |
| "HYPERLIQUID_ACCOUNT_ADDRESS": os.getenv("HYPERLIQUID_ACCOUNT_ADDRESS"), | |
| }, | |
| }, | |
| client_session_timeout_seconds=60, | |
| ) | |
| ] | |
| def make_crypto_news_mcp_servers(): | |
| return [ | |
| MCPServerStdio( | |
| {"command": "python3", "args": ["-u", NEWS_SERVER_SCRIPT]}, | |
| client_session_timeout_seconds=60, | |
| ) | |
| ] | |
| def make_technical_analyst_mcp_servers(): | |
| return [ | |
| MCPServerStdio( | |
| {"command": "python3", "args": ["-u", TA_SERVER_SCRIPT]}, | |
| client_session_timeout_seconds=60, | |
| ) | |
| ] | |
| # ====================== UTILITIES ====================== | |
| async def connect_all(servers): | |
| for s in servers: | |
| await s.connect() | |
| async def close_all(servers): | |
| for s in servers: | |
| try: | |
| await s.close() | |
| except Exception: | |
| pass | |
| def now() -> datetime: | |
| """Return current UTC time (aware).""" | |
| return datetime.now(timezone.utc) | |
| def next_run_dt(interval_mins: int = RUN_INTERVAL_MINS) -> datetime: | |
| """ | |
| Return the next run time, aligned to interval_mins in UTC. | |
| Example: if now=10:07 UTC and interval=15 β 10:15 UTC. | |
| """ | |
| t = now().replace(microsecond=0) | |
| n = int(interval_mins) | |
| if n <= 0: | |
| raise ValueError("interval_mins must be > 0") | |
| step = n * 60 | |
| ts = int(t.timestamp()) | |
| next_ts = ts - (ts % step) + step | |
| return datetime.fromtimestamp(next_ts, tz=timezone.utc) | |
| def eta_str(target: datetime) -> str: | |
| """Return time difference between now() and target as 'MM:SS' (UTC).""" | |
| secs = max(0, int((target - now()).total_seconds())) | |
| m, s = divmod(secs, 60) | |
| return f"{m:02d}:{s:02d}" | |
| def write_mem_csv(df): | |
| if df is None or df.empty: | |
| df = pd.DataFrame(columns=["ts", "summary"]) | |
| df.to_csv(CSV_PATH, index=False) | |
| return CSV_PATH | |
| # ====================== RESEARCHER AGENTS ====================== | |
| async def get_crypto_news_researcher(mcp_servers) -> Agent: | |
| instructions = ( | |
| "You are a cryptocurrency researcher. You can call MCP tools to gather news and surface " | |
| "potential opportunities. If the user (or prompt) specifies coins (e.g., HYPE, BTC, ETH, XRP), " | |
| "focus on those; otherwise scan broadly. Be concise and actionable.\n" | |
| f"Time (UTC): {now().strftime('%Y-%m-%d %H:%M:%S')}" | |
| ) | |
| return Agent( | |
| name="Crypto news researcher", | |
| instructions=instructions, | |
| model=MODEL, | |
| mcp_servers=mcp_servers, | |
| ) | |
| async def get_crypto_news_researcher_tool(mcp_servers) -> Tool: | |
| researcher = await get_crypto_news_researcher(mcp_servers) | |
| return researcher.as_tool( | |
| tool_name="crypto_news_researcher", | |
| tool_description="Research crypto news and summarize opportunities (broad or coin-specific).", | |
| ) | |
| async def get_technical_analyst_researcher(mcp_servers) -> Agent: | |
| # Generic; LLM decides which TA tools to call and in what order. | |
| instructions = ( | |
| "Role: Cryptocurrency technical researcher using MCP indicator tools.\n" | |
| "Goals:\n" | |
| "- Form a directional bias (long/short/neutral) for each requested coin/timeframe.\n" | |
| "- Summarize key trend/momentum/volatility/volume signals with latest values.\n" | |
| "- Optionally outline a trade *idea skeleton*, without enforcing a fixed strategy.\n" | |
| "Reasoning Hints (non-prescriptive):\n" | |
| "- Trend (MA/EMA alignment, slope); Momentum (RSI/MACD); Volatility (ATR, squeezes); Volume/Flow (OBV/ADL/VWAP context); Strength (ADX).\n" | |
| "- Call out divergences, regime shifts, chop (low ADX), or news-led anomalies.\n" | |
| "- Keep caveats explicit (thin liquidity, event risk, conflicting signals).\n" | |
| "Data Discipline:\n" | |
| "- Use only completed candles (exclude_current_bar=True).\n" | |
| "- Default params unless caller overrides:\n" | |
| " interval='1h', lookback_period=6, limit=600, include_signals=True, exclude_current_bar=True, return_last_only=False.\n" | |
| "- When calling tools, pass only the coin symbol as `name` (eg. BTC, ETH, SOL, XRP, SUI, etc.). No exchange prefixes/suffixes or quote assets.\n" | |
| "\n" | |
| "Output (per coin)\n" | |
| "1) One-line bias + confidence.\n" | |
| "2) Bullet signals with latest values and reasoning.\n" | |
| "3) Optional trade idea.\n" | |
| "4) Caveats. \n\n" | |
| "\n" | |
| f"Time (UTC): {now().strftime('%Y-%m-%d %H:%M:%S')}" | |
| ) | |
| return Agent( | |
| name="Crypto technical researcher", | |
| instructions=instructions, | |
| model=MODEL, | |
| mcp_servers=mcp_servers, | |
| ) | |
| async def get_technical_analyst_researcher_tool(mcp_servers) -> Tool: | |
| researcher = await get_technical_analyst_researcher(mcp_servers) | |
| return researcher.as_tool( | |
| tool_name="crypto_technical_researcher", | |
| tool_description="Analyze TA indicators and propose trade ideas (coin/interval/lookback optional).", | |
| ) | |
| # ====================== TRADER CORE ====================== | |
| async def run_trader(): | |
| hyper_servers = [] | |
| news_servers = [] | |
| ta_servers = [] | |
| try: | |
| print(f"\nπ Starting trading cycle at UTC {now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| # Memory (optional context) | |
| past_memories = load_memories(1) | |
| memory_text = "\n".join(past_memories) if past_memories else "No prior memories." | |
| # Account (assumed async in your hype_accounts_server) | |
| account_details = await hype_accounts_server.get_account_details() | |
| pretty_details = json.dumps(account_details, indent=2) if isinstance(account_details, (dict, list)) else str(account_details) | |
| trader_instructions = ( | |
| "You are a cryptocurrency perpetuals trader.\n" | |
| f"Past insights:\n{memory_text}\n\n" | |
| f"Current holdings/balance:\n{pretty_details}\n\n" | |
| "You have tools for news, hourly technical indicators, holdings/balance, and Hyperliquid execution.\n" | |
| "When scanning for crypto news, scan broadly for any oppotunities." | |
| "When calling technical indicator tools, pass only the coin symbol as `name` (eg. BTC, ETH, SOL, XRP, SUI, HYPE, ZEC, etc.) Do NOT include exchange prefixes/suffixes or quote assets (no BINANCE:, no /USDT, no -PERP, no .PERP).\n" | |
| "Carry out trades as you see fit β take profits and cut losses as needed." | |
| "Do not wait for instructions or ask for confirmation." | |
| "Each transaction costs 0.04%.\n" | |
| "If signals are unclear, do not trade." | |
| ) | |
| # High-level prompt to drive the agent plan | |
| prompt = ( | |
| "Step 1 β Check holdings & balance.\n" | |
| "Step 2 β Scan crypto news broadly for ANY potential coin trading opportunities. Be open minded. Do not limit to a list.\n" | |
| "Step 3 β Select 3β5 symbols based on news & holdings. Analyze 1h indicators and propose strategies.\n" | |
| "Step 4 β Execute long/short only if edge exists; else abstain.\n" | |
| f"Now (UTC): {now().strftime('%Y-%m-%d %H:%M')}" | |
| ) | |
| # Fresh servers each run | |
| hyper_servers = make_hyperliquid_trader_mcp_servers() | |
| news_servers = make_crypto_news_mcp_servers() | |
| ta_servers = make_technical_analyst_mcp_servers() | |
| await connect_all(hyper_servers) | |
| await connect_all(news_servers) | |
| await connect_all(ta_servers) | |
| # Bind tools to their respective servers | |
| crypto_news_tool = await get_crypto_news_researcher_tool(news_servers) | |
| ta_tool = await get_technical_analyst_researcher_tool(ta_servers) | |
| trader = Agent( | |
| name="crypto_trader", | |
| instructions=trader_instructions, | |
| tools=[crypto_news_tool, ta_tool], | |
| mcp_servers=hyper_servers, # trading actions live here | |
| model=MODEL, | |
| ) | |
| async def _run(): | |
| with trace("crypto_trader"): | |
| return await Runner.run(trader, prompt, max_turns=30) | |
| # result = await asyncio.wait_for(_run(), timeout=MAX_RUN_SECS) | |
| result = await _run() | |
| save_memory(result.final_output) | |
| mem_df = load_memories_df(10) if past_memories else pd.DataFrame(columns=["ts", "summary"]) | |
| output_md = ( | |
| f"### β Trade Completed UTC {now().strftime('%Y-%m-%d %H:%M')}\n\n" | |
| f"{result.final_output}\n\n" | |
| f"---\n\n" | |
| f"### π³ Account Details (Post-Run)\n" | |
| f"```\n{pretty_details}\n```" | |
| ) | |
| return output_md, mem_df | |
| except asyncio.TimeoutError: | |
| return "β Timed out (300s). Consider raising timeout or reducing tool work per cycle.", pd.DataFrame(columns=["ts", "summary"]) | |
| except Exception as e: | |
| err = traceback.format_exc() | |
| return f"β Error during trading cycle:\n```\n{err}\n```", pd.DataFrame(columns=["ts", "summary"]) | |
| finally: | |
| await close_all(hyper_servers) | |
| await close_all(news_servers) | |
| await close_all(ta_servers) | |
| # ====================== SCHEDULER / UI STATE ====================== | |
| LATEST_OUTPUT = "β³ Waiting for first scheduled runβ¦" | |
| LATEST_MEM_DF = pd.DataFrame(columns=["ts", "summary"]) | |
| NEXT_RUN_AT = None | |
| IS_RUNNING = False | |
| LAST_RUN_STARTED_AT = None | |
| LAST_RUN_ENDED_AT = None | |
| STOP_TRADING = False | |
| _STATE_LOCK = asyncio.Lock() | |
| _SCHEDULER_STARTED = False | |
| _RUN_LOCK = asyncio.Lock() | |
| async def safe_run_trader(force: bool = False): | |
| global LATEST_OUTPUT, LATEST_MEM_DF, IS_RUNNING, LAST_RUN_STARTED_AT, LAST_RUN_ENDED_AT | |
| if _RUN_LOCK.locked() or (STOP_TRADING and not force): | |
| return | |
| async with _RUN_LOCK: | |
| IS_RUNNING = True | |
| LAST_RUN_STARTED_AT = now() | |
| try: | |
| md, mem_df = await asyncio.wait_for(run_trader(), timeout=MAX_RUN_SECS) | |
| async with _STATE_LOCK: | |
| LATEST_OUTPUT = md or "(no output)" | |
| if isinstance(mem_df, pd.DataFrame) and not mem_df.empty: | |
| LATEST_MEM_DF = mem_df.copy() | |
| else: | |
| stamp = now().strftime("%Y-%m-%d %H:%M:%S") | |
| extra = pd.DataFrame([{"ts": stamp, "summary": "β Run completed"}]) | |
| LATEST_MEM_DF = pd.concat([LATEST_MEM_DF, extra], ignore_index=True) | |
| except Exception: | |
| err = traceback.format_exc() | |
| async with _STATE_LOCK: | |
| LATEST_OUTPUT = f"β Exception\n```\n{err}\n```" | |
| finally: | |
| LAST_RUN_ENDED_AT = now() | |
| IS_RUNNING = False | |
| # --- Manual "Run Now" trigger (async) --- | |
| async def on_run_now(): | |
| # Force a run even if paused; lock prevents overlap | |
| await safe_run_trader(force=True) | |
| # Return the same tuple your timer updates, so the UI refreshes immediately | |
| latest, status, mem_df, csv_path = await ui_poll() | |
| return latest, status, mem_df, csv_path | |
| async def run_scheduler(): | |
| global NEXT_RUN_AT | |
| NEXT_RUN_AT = next_run_dt(RUN_INTERVAL_MINS) | |
| # sleep until next aligned tick | |
| await asyncio.sleep(max(0.0, (NEXT_RUN_AT - now()).total_seconds())) | |
| while True: | |
| if not STOP_TRADING: | |
| await safe_run_trader() | |
| NEXT_RUN_AT = next_run_dt(RUN_INTERVAL_MINS) | |
| else: | |
| NEXT_RUN_AT = None | |
| # sleep towards the next run or heartbeat-period while paused | |
| await asyncio.sleep( | |
| HEARTBEAT_SECS if STOP_TRADING else max(0.0, (NEXT_RUN_AT - now()).total_seconds()) | |
| ) | |
| async def heartbeat(): | |
| while True: | |
| state = "βΈοΈ Paused" if STOP_TRADING else ("π’ Running" if IS_RUNNING else "π‘ Idle") | |
| print(f"[{now():%H:%M:%S}] π {state}") | |
| await asyncio.sleep(HEARTBEAT_SECS) | |
| async def ensure_scheduler_started(): | |
| global _SCHEDULER_STARTED | |
| if not _SCHEDULER_STARTED: | |
| _SCHEDULER_STARTED = True | |
| asyncio.create_task(heartbeat()) | |
| asyncio.create_task(run_scheduler()) | |
| def status_markdown(): | |
| lines = [f"**Status:** {'βΈοΈ Paused' if STOP_TRADING else ('π’ Running' if IS_RUNNING else 'π‘ Idle')}"] | |
| if LAST_RUN_STARTED_AT: | |
| lines.append(f"**Last start (UTC):** {LAST_RUN_STARTED_AT:%Y-%m-%d %H:%M:%S}") | |
| if LAST_RUN_ENDED_AT: | |
| lines.append(f"**Last end (UTC):** {LAST_RUN_ENDED_AT:%Y-%m-%d %H:%M:%S}") | |
| if NEXT_RUN_AT and not STOP_TRADING: | |
| lines.append(f"**Next run (UTC):** {NEXT_RUN_AT:%Y-%m-%d %H:%M} (ETA {eta_str(NEXT_RUN_AT)})") | |
| if STOP_TRADING: | |
| lines.append("**Scheduler:** waiting for Resume") | |
| return "\n\n".join(lines) | |
| async def ui_poll(): | |
| async with _STATE_LOCK: | |
| latest = LATEST_OUTPUT | |
| mem_df = LATEST_MEM_DF.copy() | |
| csv_path = write_mem_csv(mem_df) | |
| return latest, status_markdown(), mem_df, csv_path | |
| def _toggle_stop(flag: bool): | |
| global STOP_TRADING | |
| STOP_TRADING = flag | |
| return status_markdown() | |
| # ====================== GRADIO UI ====================== | |
| with gr.Blocks(fill_height=True) as demo: | |
| gr.Markdown("## π GPT Crypto Trader β Live Board") | |
| gr.Markdown( | |
| "[agent tracer](https://platform.openai.com/logs?api=traces) \n" | |
| "[hyperliquid platform](https://app.hyperliquid.xyz/trade)" | |
| ) | |
| with gr.Row(): | |
| stop_btn = gr.Button("π Stop", variant="stop") | |
| resume_btn = gr.Button("βΆοΈ Resume", variant="primary") | |
| run_now_btn = gr.Button("β‘ Run Now", variant="secondary") | |
| latest_md = gr.Markdown(value=LATEST_OUTPUT) | |
| status_md = gr.Markdown(value=status_markdown()) | |
| mem_table = gr.Dataframe(value=LATEST_MEM_DF, interactive=False, wrap=True, show_label=False) | |
| mem_csv = gr.File(interactive=False) | |
| stop_btn.click(lambda: _toggle_stop(True), outputs=[status_md]) | |
| resume_btn.click(lambda: _toggle_stop(False), outputs=[status_md]) | |
| run_now_btn.click(on_run_now, outputs=[latest_md, status_md, mem_table, mem_csv]) | |
| demo.load(ensure_scheduler_started) | |
| timer = gr.Timer(5.0, active=True) | |
| timer.tick(ui_poll, outputs=[latest_md, status_md, mem_table, mem_csv]) | |
| if __name__ == "__main__": | |
| demo.queue().launch(share=True) |