Hex / app.py
OxMxO's picture
Update app.py
f4df903 verified
raw
history blame
11.1 kB
"""
Single-Space app for HexStrike on Hugging Face β€” chat UI + remote MCP endpoint.
Pinned for Gradio 5.x. If you bump to Gradio 6, you'll need to:
* Remove the `type="messages"` kwarg from gr.ChatInterface (gone in 6.x).
* Move `theme=` off gr.Blocks() β€” it lives on demo.launch() now.
* Update history format to `[{role, content: [{type:"text", text:...}]}]`.
This file is the foreground process on port 7860. It does three things:
1. Spawns `mcp-proxy` as a background subprocess on 127.0.0.1:8765,
wrapping the upstream `hexstrike_mcp.py` stdio server.
2. Runs a Gradio chat UI at / driven by a Hugging Face Inference API
LLM (default: Qwen/Qwen2.5-72B-Instruct). The LLM emits tool calls;
we forward them to mcp-proxy, feed results back, loop until done.
3. Reverse-proxies /servers/hexstrike/* onto mcp-proxy so external MCP
clients (Claude Desktop, Cursor, the bundled CLI agent) can still
connect remotely, guarded by the same bearer token.
"""
from __future__ import annotations
import asyncio
import atexit
import json
import logging
import os
import subprocess
import sys
import time
from contextlib import asynccontextmanager
import gradio as gr
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from huggingface_hub import InferenceClient
from mcp import ClientSession
from mcp.client.sse import sse_client
# ─── Config ──────────────────────────────────────────────────────────────────
HEXSTRIKE_TOKEN = os.environ.get("HEXSTRIKE_TOKEN", "")
HF_TOKEN = os.environ.get("HF_TOKEN", "")
HF_MODEL = "swiss-ai/Apertus-70B-Instruct-2509"
HF_PROVIDER = "publicai"
MCP_INTERNAL_PORT = 8765
MCP_LOCAL_URL = f"http://127.0.0.1:{MCP_INTERNAL_PORT}/servers/hexstrike/sse"
MAX_STEPS = int(os.environ.get("HEXSTRIKE_MAX_STEPS", "8"))
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("hexstrike-app")
if not HEXSTRIKE_TOKEN:
log.error("FATAL: HEXSTRIKE_TOKEN secret is not set. Refusing to start.")
sys.exit(1)
if not HF_TOKEN:
log.warning("HF_TOKEN is not set β€” chat UI will refuse to call the LLM.")
# ─── Spawn mcp-proxy as a background subprocess ──────────────────────────────
def start_mcp_proxy() -> subprocess.Popen:
log.info("Spawning mcp-proxy on 127.0.0.1:%d", MCP_INTERNAL_PORT)
env = os.environ.copy()
env["API_ACCESS_TOKEN"] = HEXSTRIKE_TOKEN
cmd = [
"mcp-proxy",
"--host", "127.0.0.1",
"--port", str(MCP_INTERNAL_PORT),
"--allow-origin=*",
"--named-server", "hexstrike",
"python3 /home/user/app/hexstrike/hexstrike_mcp.py --server http://127.0.0.1:8888",
]
proc = subprocess.Popen(cmd, env=env)
atexit.register(lambda: proc.terminate())
# Wait up to 30s for mcp-proxy to bind.
for _ in range(60):
try:
with httpx.Client(timeout=1.0) as c:
c.get(f"http://127.0.0.1:{MCP_INTERNAL_PORT}/")
log.info("βœ… mcp-proxy is listening")
return proc
except Exception:
time.sleep(0.5)
log.error("mcp-proxy failed to bind within 30s")
return proc
MCP_PROC = start_mcp_proxy()
# ─── LLM agent loop ──────────────────────────────────────────────────────────
SYSTEM_PROMPT = (
"You are a cautious offensive-security assistant with access to 150+ "
"HexStrike pentesting tools via MCP. RULES:\n"
"1. You may ONLY call tools against targets the user EXPLICITLY says they "
" own or have written permission to test. If unclear, ASK before scanning.\n"
"2. Prefer light recon (analyze_target, whatweb_fingerprint, nmap -sV) "
" before any active or intrusive scanning.\n"
"3. Tools that need root (masscan, responder, nmap -sS) will fail in this "
" environment β€” use TCP-connect alternatives (e.g. nmap -sT) instead.\n"
"4. Summarize findings clearly at the end. Never claim a vulnerability "
" exists without evidence from a tool's output."
)
@asynccontextmanager
async def open_mcp():
headers = {"Authorization": f"Bearer {HEXSTRIKE_TOKEN}"}
async with sse_client(MCP_LOCAL_URL, headers=headers) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
yield session
def mcp_tools_to_openai(mcp_tools) -> list[dict]:
return [
{
"type": "function",
"function": {
"name": t.name,
"description": (t.description or "")[:1024],
"parameters": t.inputSchema or {"type": "object", "properties": {}},
},
}
for t in mcp_tools
]
async def run_agent(user_prompt: str, history: list[dict]) -> str:
if not HF_TOKEN:
return ("⚠️ The `HF_TOKEN` Space secret isn't set. Add it under "
"Settings β†’ Variables and secrets to enable the chat.")
client = InferenceClient(model=HF_MODEL, token=HF_TOKEN)
trace: list[str] = []
async with open_mcp() as mcp:
tools_resp = await mcp.list_tools()
tools = mcp_tools_to_openai(tools_resp.tools)
trace.append(f"πŸ“š {len(tools)} HexStrike tools available")
messages: list[dict] = [{"role": "system", "content": SYSTEM_PROMPT}]
for h in (history or [])[-10:]:
# Gradio 5.x messages format: {"role": "user"|"assistant", "content": "..."}
messages.append({"role": h["role"], "content": h["content"]})
messages.append({"role": "user", "content": user_prompt})
for step in range(MAX_STEPS):
try:
resp = client.chat_completion(
messages=messages,
tools=tools,
tool_choice="auto",
max_tokens=1024,
temperature=0.2,
)
except Exception as e:
return f"❌ LLM call failed: `{type(e).__name__}: {e}`"
msg = resp.choices[0].message
messages.append(msg.model_dump() if hasattr(msg, "model_dump") else dict(msg))
tool_calls = getattr(msg, "tool_calls", None) or []
if not tool_calls:
final = msg.content or "(no content)"
if len(trace) > 1:
return ("<details><summary>πŸ”§ Tool trace</summary>\n\n```\n"
+ "\n".join(trace) + "\n```\n</details>\n\n" + final)
return final
for call in tool_calls:
name = call.function.name
try:
args = json.loads(call.function.arguments or "{}")
except json.JSONDecodeError:
args = {}
trace.append(f"step {step}: {name}({args})")
log.info("πŸ”§ %s(%s)", name, args)
try:
result = await mcp.call_tool(name, args)
text = "\n".join(
c.text for c in result.content if getattr(c, "text", None)
) or json.dumps(result.model_dump(), default=str)
except Exception as e:
text = f"[tool error] {type(e).__name__}: {e}"
messages.append({
"role": "tool",
"tool_call_id": call.id,
"name": name,
"content": text[:8000],
})
return f"⚠️ Hit the {MAX_STEPS}-step tool-loop cap without a final answer."
def chat_fn(message: str, history: list[dict]) -> str:
return asyncio.run(run_agent(message, history or []))
# ─── Gradio UI (Gradio 5.x API) ──────────────────────────────────────────────
with gr.Blocks(title="HexStrike AI", theme=gr.themes.Soft()) as demo:
gr.Markdown(
f"""
# πŸ›‘οΈ HexStrike AI Chat
Powered by **{HF_MODEL}** + 150+ HexStrike tools via MCP.
⚠️ **Authorized targets only.** Only scan systems you own or have
written permission to test. The agent will refuse ambiguous targets.
πŸ’‘ *"My lab box is 10.0.0.5 β€” fingerprint it lightly and tell me what
services are exposed."*
"""
)
gr.ChatInterface(
fn=chat_fn,
type="messages", # Gradio 5.x: OpenAI-style history
examples=[
"List the tools you have and group them by category.",
"I own scanme.nmap.org. Do a light nmap -sT -sV on it.",
"Fingerprint the web app at https://my-lab.example (I own it).",
],
cache_examples=False,
)
# ─── FastAPI app: Gradio + reverse-proxy for /servers/hexstrike/* ────────────
app = FastAPI(title="HexStrike AI Space")
@app.get("/healthz")
async def healthz():
try:
async with httpx.AsyncClient(timeout=3) as c:
r = await c.get("http://127.0.0.1:8888/health")
return {"ok": True, "hexstrike": r.json(), "mcp_proxy_pid": MCP_PROC.pid}
except Exception as e:
return {"ok": False, "error": str(e)}
@app.api_route("/servers/hexstrike/{path:path}",
methods=["GET", "POST", "OPTIONS"])
async def mcp_reverse_proxy(path: str, request: Request):
"""Reverse-proxy MCP onto loopback mcp-proxy. SSE streams without buffering."""
url = f"http://127.0.0.1:{MCP_INTERNAL_PORT}/servers/hexstrike/{path}"
headers = {k: v for k, v in request.headers.items()
if k.lower() not in ("host", "content-length")}
body = await request.body()
client = httpx.AsyncClient(timeout=None)
req = client.build_request(
request.method, url,
headers=headers, content=body, params=request.query_params,
)
upstream = await client.send(req, stream=True)
async def streamer():
try:
async for chunk in upstream.aiter_raw():
yield chunk
finally:
await upstream.aclose()
await client.aclose()
excluded = {"content-encoding", "transfer-encoding", "connection"}
resp_headers = {k: v for k, v in upstream.headers.items()
if k.lower() not in excluded}
return StreamingResponse(
streamer(),
status_code=upstream.status_code,
headers=resp_headers,
media_type=upstream.headers.get("content-type"),
)
# Mount Gradio LAST so the FastAPI routes above take precedence.
app = gr.mount_gradio_app(app, demo, path="/")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860, log_level="info")