# app.py import os import re import asyncio from datetime import datetime from zoneinfo import ZoneInfo import gradio as gr import pandas as pd from dotenv import load_dotenv from agents import Agent, Runner, trace, Tool from agents.mcp import MCPServerStdio # Your local helper modules import hype_accounts_server from memory_utils import load_memories, save_memory, load_memories_df load_dotenv(override=True) # === Time / Locale === SGT = ZoneInfo("Asia/Singapore") def now_sgt(): return datetime.now(SGT) # === MCP Server Factories === def make_hyperliquid_trader_mcp_servers(): return [MCPServerStdio( {"command": "python3", "args": ["-u", "hype_accounts_server.py"], "env": { "HYPERLIQUID_API_KEY": os.getenv("HYPERLIQUID_API_KEY"), "HYPERLIQUID_PRIVATE_KEY": os.getenv("HYPERLIQUID_PRIVATE_KEY"), "HYPERLIQUID_ACCOUNT_ADDRESS": os.getenv("HYPERLIQUID_ACCOUNT_ADDRESS"), }}, client_session_timeout_seconds=30 )] def make_crypto_news_mcp_servers(): # Uses your scraper-based news MCP to avoid API plan limits return [MCPServerStdio( {"command": "python3", "args": ["-u", "crypto_news_scraper_server.py"]}, client_session_timeout_seconds=30 )] def make_technical_analyst_mcp_servers(): return [MCPServerStdio( {"command": "python3", "args": ["-u", "hl_indicators_server.py"]}, client_session_timeout_seconds=30 )] # === Utils for MCP lifecycle === async def connect_all(servers): for s in servers: await s.connect() async def close_all(servers): for s in servers: try: await s.close() except Exception: pass # === Agent Builders === async def build_news_tool(news_servers) -> Tool: instructions = ( "You are a cryptocurrency researcher. You can search and summarise the most relevant, " "recent crypto news. If the user asks about a specific coin (e.g., HYPE, BTC, ETH, XRP), " "focus on that. Otherwise, highlight notable events and potential long/short opportunities. " f"Current datetime (SGT): {now_sgt():%Y-%m-%d %H:%M:%S}." ) agent = Agent( name="Crypto news researcher", instructions=instructions, model="gpt-4.1-mini", mcp_servers=news_servers, ) return agent.as_tool( tool_name="crypto_news_researcher", tool_description="Research crypto news and opportunities for a coin or broad scan." ) async def build_ta_tool(ta_servers) -> Tool: instructions = ( "You are a cryptocurrency perpetuals technical trading researcher.\n" "Default interval: 1h; default lookback: 36.\n" "Indicators: EMA(20,200), MACD(12,26,9), StochRSI(14,14,3,3), ADL, Volume.\n" "Given a coin/interval/lookback, compute indicator state, infer trend, and propose entries, " "exits, and stop-loss/take-profit with reasoning.\n" f"Current datetime (SGT): {now_sgt():%Y-%m-%d %H:%M:%S}." ) agent = Agent( name="Crypto technical researcher", instructions=instructions, model="gpt-4.1-mini", mcp_servers=ta_servers, ) return agent.as_tool( tool_name="crypto_technical_researcher", tool_description="Run TA (EMA, MACD, StochRSI, ADL, Volume)." ) async def build_trader(hyper_servers, tools: list[Tool]) -> Agent: # Pull short memory + balances so the agent can context-switch well past_memories = load_memories(5) memory_text = "\n".join(past_memories) if past_memories else "No prior memories." try: account_details = await hype_accounts_server.get_account_details() except Exception as e: account_details = f"(Could not fetch account details: {e})" instructions = f""" You are a cryptocurrency perpetuals trader that can: - Query account balances/positions (via MCP servers on Hyperliquid). - Do market/news research and TA using attached tools. - Place long/short orders when the setup has clear edge. Transaction cost: 0.04%. - If signals are unclear, do NOT trade. Recent notes: {memory_text} Account state: {account_details} General rules: - Prefer confluence: trend + momentum + volume/ADL agreement. - Always suggest stop-loss and take-profit levels. - Keep risk per trade modest. Avoid overtrading. """ trader = Agent( name="crypto_trader", instructions=instructions, tools=tools, mcp_servers=hyper_servers, # these expose trading actions model="gpt-4.1-mini", ) return trader # === Intent Routing === COMMAND_HELP = """\ You can ask in natural language, e.g.: • "Balance" / "portfolio" — show Hyperliquid balances/positions • "News on BTC and ETH" — market research • "TA HYPE 1h lookback 48" — technical analysis • "Long HYPE 500 at market, SL 2% TP 4%" — execute trade • "Short BTC 0.01 at 68000, SL 69000 TP 66000" — limit order example • "Summarize opportunities today" — broad scan (news + TA) """ RE_TA = re.compile(r"\bTA\s+([A-Za-z0-9_\-]+)(?:\s+(\d+[mhHdD]))?(?:\s+lookback\s+(\d+))?", re.IGNORECASE) RE_LONG = re.compile(r"\bLONG\s+([A-Za-z0-9_\-]+)\s+([\d.]+)(?:\s+at\s+(market|mkt|[\d.]+))?(?:.*?\bSL\s+([\d.%]+))?(?:.*?\bTP\s+([\d.%]+))?", re.IGNORECASE) RE_SHORT = re.compile(r"\bSHORT\s+([A-Za-z0-9_\-]+)\s+([\d.]+)(?:\s+at\s+(market|mkt|[\d.]+))?(?:.*?\bSL\s+([\d.%]+))?(?:.*?\bTP\s+([\d.%]+))?", re.IGNORECASE) RE_CLOSE = re.compile(r"\b(close|exit|flatten)\s+(all|[A-Za-z0-9_\-]+)(?:\s+(\d+)%|\s+([\d.]+))?", re.IGNORECASE) def _close_desc(coin_or_all: str, pct: str | None, qty: str | None) -> str: coin_or_all = coin_or_all.upper() if coin_or_all == "ALL": return "Close ALL open positions at market" if pct: return f"Close {pct}% of {coin_or_all} position at market" if qty: return f"Reduce {coin_or_all} position by {qty} units at market" return f"Close {coin_or_all} position at market" def pct_or_price(s): if not s: return None s = s.strip().lower() if s.endswith("%"): try: return {"type": "percent", "value": float(s[:-1])} except: return None try: return {"type": "price", "value": float(s)} except: return None # === Core Chatbot Handler === async def handle_message(message: str, history: list[tuple[str, str]]): """ Routes user intent to: balance, news, TA, or trade execution. Returns markdown text. """ text = (message or "").strip() ts = now_sgt().strftime("%Y-%m-%d %H:%M:%S %Z") # Quick help if text.lower() in {"help", "/help", "commands"}: return f"### Commands\n{COMMAND_HELP}" # 1) Balance / portfolio if re.search(r"\b(balance|portfolio|positions?)\b", text, re.IGNORECASE): try: acct = await hype_accounts_server.get_account_details() save_memory(f"[{now_sgt():%Y-%m-%d %H:%M:%S %Z}] User checked balance.") return format_account_for_chat(acct) except Exception as e: return f"❌ Error fetching account details: `{e}`" # 2) TA intent m = RE_TA.search(text) if m: coin = m.group(1).upper() interval = (m.group(2) or "1h").lower() lookback = int(m.group(3) or 36) news_servers = [] # not needed here ta_servers = [] try: ta_servers = make_technical_analyst_mcp_servers() await connect_all(ta_servers) ta_tool = await build_ta_tool(ta_servers) # Build a "TA-only" agent so we don't touch trading MPC here researcher = Agent( name="crypto_ta_agent", instructions=f"Focus on TA for {coin} at interval {interval}, lookback {lookback}. Output indicator values and strategy.", tools=[ta_tool], model="gpt-4.1-mini", ) prompt = f"Run TA for {coin} on {interval}, lookback {lookback}. Return indicators and actionable plan." with trace("crypto_ta"): result = await Runner.run(researcher, prompt, max_turns=12) save_memory(f"[{ts}] TA {coin} {interval} lookback {lookback}") return f"### 🔬 TA — {coin} ({interval}, lookback {lookback})\n\n{result.final_output}" except Exception as e: return f"❌ TA error: `{e}`" finally: await close_all(ta_servers) # 3) Trade intent (LONG / SHORT) mm = RE_LONG.search(text) or RE_SHORT.search(text) if mm: is_long = bool(RE_LONG.search(text)) side = "LONG" if is_long else "SHORT" coin = mm.group(1).upper() qty = float(mm.group(2)) at = mm.group(3) # "market"/"mkt" or price sl_raw = mm.group(4) tp_raw = mm.group(5) sl = pct_or_price(sl_raw) tp = pct_or_price(tp_raw) price_desc = "market" if (at is None or str(at).lower() in {"market", "mkt"}) else at order_desc = f"{side} {coin} {qty} at {price_desc}" if sl: order_desc += f", SL {sl_raw}" if tp: order_desc += f", TP {tp_raw}" hyper_servers = [] news_servers = [] ta_servers = [] try: # Tools available to the *trader*: news + TA news_servers = make_crypto_news_mcp_servers() ta_servers = make_technical_analyst_mcp_servers() hyper_servers = make_hyperliquid_trader_mcp_servers() await asyncio.gather( connect_all(news_servers), connect_all(ta_servers), connect_all(hyper_servers), ) news_tool = await build_news_tool(news_servers) ta_tool = await build_ta_tool(ta_servers) trader = await build_trader(hyper_servers, [news_tool, ta_tool]) # Natural-language trade instruction to the trader agent. trade_prompt = f""" User requested: {order_desc}. If safe and reasonable given risk rules, place the order via Hyperliquid MCP. - If price specified (numeric), treat as limit; otherwise market. - Always include stop-loss and take-profit (convert % to prices). - Confirm the exact order(s) you placed and rationale in the output. """ with trace("trade_execution"): result = await Runner.run(trader, trade_prompt, max_turns=20) save_memory(f"[{ts}] Executed: {order_desc}") return f"### 🧾 Execution — {order_desc}\n\n{result.final_output}" except Exception as e: return f"❌ Trade execution error: `{e}`" finally: await asyncio.gather( close_all(news_servers), close_all(ta_servers), close_all(hyper_servers), ) # 4) News intent (e.g., "news on BTC", "what's happening to HYPE") if re.search(r"\b(news|headline|what's happening|what is happening|happening)\b", text, re.IGNORECASE): # Try to pick coins mentioned coins = re.findall(r"\b([A-Z]{2,6})\b", text.upper()) coins = [c for c in coins if c not in {"NEWS", "HELP"}] topic = ", ".join(coins) if coins else "broad market" news_servers = [] try: news_servers = make_crypto_news_mcp_servers() await connect_all(news_servers) news_tool = await build_news_tool(news_servers) researcher = Agent( name="crypto_news_agent", instructions=f"Focus news on: {topic}. Be concise and actionable.", tools=[news_tool], model="gpt-4.1-mini", ) prompt = f"Summarize the most relevant crypto news for {topic}. Include potential trade angles." with trace("crypto_news"): result = await Runner.run(researcher, prompt, max_turns=12) save_memory(f"[{ts}] News requested: {topic}") return f"### 🗞️ News — {topic}\n\n{result.final_output}" except Exception as e: return f"❌ News error: `{e}`" finally: await close_all(news_servers) # 5) Summary scan (news + TA picks) if re.search(r"\b(opportunit|ideas|setup|summary|today)\b", text, re.IGNORECASE): hyper_servers = [] news_servers = [] ta_servers = [] try: news_servers = make_crypto_news_mcp_servers() ta_servers = make_technical_analyst_mcp_servers() hyper_servers = make_hyperliquid_trader_mcp_servers() await asyncio.gather( connect_all(news_servers), connect_all(ta_servers), connect_all(hyper_servers), ) news_tool = await build_news_tool(news_servers) ta_tool = await build_ta_tool(ta_servers) trader = await build_trader(hyper_servers, [news_tool, ta_tool]) prompt = ( "Step 1: Broad news scan for major catalysts.\n" "Step 2: Pick 3–5 coins with potential edges; run compact TA summary (1h, lookback 36).\n" "Step 3: Recommend 1–2 best setups with entry, SL, TP and rationale. Do NOT place orders." ) with trace("daily_opportunities"): result = await Runner.run(trader, prompt, max_turns=24) save_memory(f"[{ts}] Opportunity summary requested.") return f"### 📌 Opportunities — {ts}\n\n{result.final_output}" except Exception as e: return f"❌ Summary error: `{e}`" finally: await asyncio.gather( close_all(news_servers), close_all(ta_servers), close_all(hyper_servers), ) # Fallback: clarify + brief help return ( "I can help with balance, news, TA, and trade execution.\n\n" + COMMAND_HELP ) mclose = RE_CLOSE.search(text) if mclose: # groups: verb, coin_or_all, pct (digits%), qty (number) coin_or_all = mclose.group(2).strip() pct = mclose.group(3) # e.g., "50" meaning 50% qty = mclose.group(4) # absolute size to reduce desc = _close_desc(coin_or_all, pct, qty) hyper_servers = [] news_servers = [] ta_servers = [] try: # Tools for trader context (optional but helpful) news_servers = make_crypto_news_mcp_servers() ta_servers = make_technical_analyst_mcp_servers() hyper_servers = make_hyperliquid_trader_mcp_servers() await asyncio.gather( connect_all(news_servers), connect_all(ta_servers), connect_all(hyper_servers), ) news_tool = await build_news_tool(news_servers) ta_tool = await build_ta_tool(ta_servers) trader = await build_trader(hyper_servers, [news_tool, ta_tool]) # Natural-language prompt to place the close orders via Hyperliquid MCP # (The trader agent already has the rules + account context) trade_prompt = f""" User request: {desc}. Instructions: - If 'ALL', close every open position at market. - If a coin is specified: - If a percent is provided, close that % of the CURRENT open position size. - If a qty is provided, reduce by that absolute base-asset amount. - If neither provided, fully close that coin position. - Include SL/TP cleanup if needed (cancel/replace any attached orders). - If the coin has no open position, report that clearly. - Return a concise execution summary listing each order (coin, side, size, order type, price if applicable) and rationale. """ with trace("close_positions"): result = await Runner.run(trader, trade_prompt, max_turns=20) save_memory(f"[{now_sgt():%Y-%m-%d %H:%M}] Close: {desc}") return f"### 🧹 Close — {desc}\n\n{result.final_output}" except Exception as e: return f"❌ Close error: `{e}`" finally: await asyncio.gather( close_all(news_servers), close_all(ta_servers), close_all(hyper_servers), ) # ---------- Pretty printing for account/positions ---------- from math import isnan def _fnum(x, decimals=2): try: v = float(x) return f"{v:,.{decimals}f}" except Exception: return str(x) def _fpct(x, decimals=2): try: v = float(x) * 100 # input is ROE like 0.0036 -> 0.36% sign = "🟢" if v > 0 else ("🔴" if v < 0 else "⚪️") return f"{sign} {v:.{decimals}f}%" except Exception: return "—" def _pnl(x, decimals=2): try: v = float(x) sign = "🟢" if v > 0 else ("🔴" if v < 0 else "⚪️") return f"{sign} ${abs(v):,.{decimals}f}" except Exception: return "—" def _side_and_abs_size(szi): try: v = float(szi) side = "LONG" if v > 0 else ("SHORT" if v < 0 else "FLAT") return side, abs(v) except Exception: return "—", szi def format_account_for_chat(acct: dict) -> str: """ Converts the get_account_details() dict into a nice Markdown summary. """ if not isinstance(acct, dict): return f"```\n{acct}\n```" holdings = acct.get("holdings", []) or [] cash = acct.get("cash_balance", "0") realized_pnl = acct.get("profit_and_loss", None) # Totals total_pos_value = 0.0 total_margin_used = 0.0 total_upnl = 0.0 rows_md = [] for h in holdings: pos = h.get("position", {}) coin = pos.get("coin", "—") szi = pos.get("szi", 0) side, abs_size = _side_and_abs_size(szi) entry = pos.get("entryPx", "—") pval = pos.get("positionValue", 0) u = pos.get("unrealizedPnl", 0) roe = pos.get("returnOnEquity", 0) lev = pos.get("leverage", {}) lev_str = f"{lev.get('type','—')}×{lev.get('value','—')}" liq = pos.get("liquidationPx", None) m_used = pos.get("marginUsed", 0) fund = pos.get("cumFunding", {}).get("sinceOpen", None) # Totals try: total_pos_value += float(pval) except: pass try: total_margin_used += float(m_used) except: pass try: total_upnl += float(u) except: pass rows_md.append( f"| {coin} | {side} | {_fnum(abs_size, 6)} | ${_fnum(entry, 2)} | ${_fnum(pval, 2)} | {_pnl(u, 2)} | {_fpct(roe, 2)} | {lev_str} | {('—' if liq in (None, 'None') else '$'+_fnum(liq, 2))} | ${_fnum(m_used, 2)} | {('—' if fund in (None, 'None') else _fnum(fund, 6))} |" ) header = ( "### 📊 Account / Positions\n" f"- **Cash balance:** ${_fnum(cash, 2)}\n" f"- **Total pos. value:** ${_fnum(total_pos_value, 2)}\n" f"- **Unrealized PnL:** {_pnl(total_upnl, 2)}\n" f"- **Margin used (total):** ${_fnum(total_margin_used, 2)}\n" ) if realized_pnl is not None: header += f"- **Realized PnL (session/period):** {_pnl(realized_pnl, 2)}\n" table_head = ( "\n| Coin | Side | Size | Entry Px | Pos. Value | uPnL | ROE | Leverage | Liq Px | Margin Used | Funding (since open) |\n" "|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|---:|\n" ) table_body = "\n".join(rows_md) if rows_md else "_No open positions_" return header + table_head + table_body # === Gradio UI === with gr.Blocks(fill_height=True) as demo: gr.Markdown("# 🤖 Crypto Trading Copilot") gr.Markdown( f"Local time: **{now_sgt():%Y-%m-%d %H:%M:%S %Z}** \n" "[OpenAI Traces](https://platform.openai.com/logs?api=traces) · " "[Hyperliquid](https://app.hyperliquid.xyz/trade)" ) with gr.Row(): quick1 = gr.Button("📊 Balance") quick2 = gr.Button("🗞️ News: BTC, ETH") quick3 = gr.Button("🔬 TA: HYPE 1h") quick4 = gr.Button("🧾 Long HYPE 500 @ market (SL 2% TP 4%)") chatbot = gr.Chatbot(height=480, type="messages", show_copy_button=True) user_in = gr.Textbox(placeholder="Try: TA HYPE 1h lookback 48 • News on BTC • Long HYPE 500 at market, SL 2% TP 4%", scale=1) send_btn = gr.Button("Send", variant="primary") with gr.Accordion("Memory (last 10)", open=False): mem_table = gr.Dataframe(value=load_memories_df(10), interactive=False, wrap=True, show_label=False) async def _respond(user_msg, chat_state): bot_md = await handle_message(user_msg, chat_state or []) # Log short memory line save_memory(f"[{now_sgt():%Y-%m-%d %H:%M}] {user_msg[:80]}") # Update display memory table latest_mem = load_memories_df(10) return chat_state + [{"role":"user","content":user_msg},{"role":"assistant","content":bot_md}], "", latest_mem send_btn.click(_respond, inputs=[user_in, chatbot], outputs=[chatbot, user_in, mem_table]) user_in.submit(_respond, inputs=[user_in, chatbot], outputs=[chatbot, user_in, mem_table]) # Quick actions async def _qa_balance(chat_state): msg = "balance" bot_md = await handle_message(msg, chat_state or []) save_memory(f"[{now_sgt():%Y-%m-%d %H:%M}] Quick: balance") latest_mem = load_memories_df(10) return chat_state + [{"role":"user","content":msg},{"role":"assistant","content":bot_md}], latest_mem async def _qa_news(chat_state): msg = "news on BTC and ETH" bot_md = await handle_message(msg, chat_state or []) save_memory(f"[{now_sgt():%Y-%m-%d %H:%M}] Quick: news BTC ETH") latest_mem = load_memories_df(10) return chat_state + [{"role":"user","content":msg},{"role":"assistant","content":bot_md}], latest_mem async def _qa_ta(chat_state): msg = "TA HYPE 1h lookback 48" bot_md = await handle_message(msg, chat_state or []) save_memory(f"[{now_sgt():%Y-%m-%d %H:%M}] Quick: TA HYPE") latest_mem = load_memories_df(10) return chat_state + [{"role":"user","content":msg},{"role":"assistant","content":bot_md}], latest_mem async def _qa_long(chat_state): msg = "Long HYPE 500 at market, SL 2% TP 4%" bot_md = await handle_message(msg, chat_state or []) save_memory(f"[{now_sgt():%Y-%m-%d %H:%M}] Quick: long HYPE") latest_mem = load_memories_df(10) return chat_state + [{"role":"user","content":msg},{"role":"assistant","content":bot_md}], latest_mem quick1.click(_qa_balance, inputs=[chatbot], outputs=[chatbot, mem_table]) quick2.click(_qa_news, inputs=[chatbot], outputs=[chatbot, mem_table]) quick3.click(_qa_ta, inputs=[chatbot], outputs=[chatbot, mem_table]) quick4.click(_qa_long, inputs=[chatbot], outputs=[chatbot, mem_table]) if __name__ == "__main__": # No deprecated args; queue() OK without concurrency_count demo.queue().launch()