File size: 18,009 Bytes
0f2dc9a 72cc67e 0f2dc9a 496fee6 0f2dc9a 496fee6 0f2dc9a a991154 69cdfce e398a68 a991154 e3b02b7 0f2dc9a e3b02b7 a991154 0f2dc9a 72cc67e 2407c30 72cc67e 0f2dc9a 496fee6 8dc3647 0f2dc9a 8dc3647 0f2dc9a 8dc3647 2cc57f1 0f2dc9a 8dc3647 0f2dc9a 71623d4 0f2dc9a 8dc3647 0f2dc9a 714c652 fa6811f 714c652 95b1c76 0f2dc9a e3b02b7 0f2dc9a e3b02b7 95b1c76 714c652 fa6811f e3b02b7 714c652 fa6811f e3b02b7 fa6811f 714c652 e3b02b7 fa6811f 714c652 fa6811f 714c652 e3b02b7 714c652 10fbb32 714c652 10fbb32 e3b02b7 714c652 0f2dc9a e3b02b7 714c652 e3b02b7 714c652 e3b02b7 0f2dc9a a765a9d 0f2dc9a 714c652 0f2dc9a a765a9d 714c652 0f2dc9a a6837f3 714c652 0f2dc9a a991154 714c652 0f2dc9a a765a9d 0f2dc9a 714c652 0f2dc9a 2cc57f1 0f2dc9a 2cc57f1 0f2dc9a 2cc57f1 0f2dc9a 2cc57f1 0f2dc9a 2cc57f1 0f2dc9a 71623d4 0f2dc9a 71623d4 0f2dc9a 71623d4 0f2dc9a 71623d4 d8c251e 0f2dc9a 71623d4 0f2dc9a 95b1c76 0f2dc9a ab7e40e 0f2dc9a ab7e40e 0f2dc9a 2cc57f1 0f2dc9a a6eb102 8cd17a1 95b1c76 8cd17a1 8dc3647 a6eb102 8dc3647 8cd17a1 0f2dc9a a6837f3 10fbb32 d8c251e 0f2dc9a ab7e40e 0f2dc9a 8cd17a1 0f2dc9a 8cd17a1 0f2dc9a 2cc57f1 0f2dc9a 2cc57f1 0f2dc9a 2cc57f1 0f2dc9a 71623d4 0f2dc9a 71623d4 0f2dc9a 71623d4 0f2dc9a 8dc3647 71623d4 0f2dc9a a6eb102 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 | import time
import uuid
import gradio as gr
from dotenv import load_dotenv
from agent import Agent, FETCH_WEBPAGE_TOOL, SHELL_TOOL, READ_TOOL, FINAL_MESSAGE_TOOL
import os
from pathlib import Path
load_dotenv()
gr.set_static_paths("static/")
_SYSTEM_PROMPT = """\
You are OpenMythos, a powerful AI agent specialized in cybersecurity-related tasks.
You have access to tools that you can use to accomplish your goals.
You are a multi-level vulnerability analysis, a visual dependency risk path, a declared threat level then generates an instant, verifiable hotfix patch before threat actors can exploit it.
When finding exploits list it in multi step use tools to search for something specific if needed.
After finding one bulnerability, you will generate a patch for it and provide a detailed explanation of the vulnerability, including its potential impact and how the patch mitigates the risk.
Than THinks again and search for new vulnerabilities and repeat the process until all vulnerabilities are found.
Don't go much looped if you find yourself in a loop just call the `final_message` tool to end the conversation.
=== IMPORTANT: How to end the conversation ===
You MUST call the `final_message` tool when you have completed your response and want to end.
If you do NOT call `final_message`, you will be stuck in a loop:
- You respond → system waits for final_message → you did not call it
- → system sends your response back to you → you must respond again
- → this repeats until you call `final_message`
To break out of the loop, simply call `final_message` with no arguments.
Only call `final_message` when you are done or already responded or stuck in a loop.
"""
agent = Agent(
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
model=os.getenv("OPENAI_MODEL"),
system_prompt=_SYSTEM_PROMPT,
)
agent.register_tool(FETCH_WEBPAGE_TOOL, SHELL_TOOL, READ_TOOL, FINAL_MESSAGE_TOOL)
agent.register_all_mcp()
agent.set_final_message_tool()
# Load JS from external files
_js_dir = Path(__file__).parent / "static" / "js"
JS_LOAD_STATE = (_js_dir / "storage.load.js").read_text()
JS_SAVE_STATE = (_js_dir / "storage.save.js").read_text()
LANDING_PAGE_SCRIPT = (_js_dir / "landing.js").read_text()
def _conv_choices(state_value):
convs = sorted(state_value["conversations"], key=lambda c: c.get("last_updated", 0), reverse=True)
return gr.update(
choices=[(c["label"], c["key"]) for c in convs],
value=state_value.get("conversation_id") or None,
)
class GradioEvents:
"""Event handlers for the chatbot UI."""
@staticmethod
def stream_response(message, state_value):
"""Stream a chat completion into the active conversation."""
if not message or not message.strip():
yield gr.skip()
return
if not state_value.get("conversation_id"):
conv_id = str(uuid.uuid4())
state_value["conversation_id"] = conv_id
state_value["conversations"].append(
{"label": message[:30], "key": conv_id, "last_updated": int(time.time() * 1000)})
state_value["conversation_contexts"][conv_id] = {
"history": []
}
else:
conv_id = state_value["conversation_id"]
state_value["conversation_contexts"].setdefault(
conv_id, {"history": []})
# Update last_updated for existing conversation
for c in state_value["conversations"]:
if c["key"] == conv_id:
c["last_updated"] = int(time.time() * 1000)
break
ctx = state_value["conversation_contexts"][conv_id]
for c in state_value["conversations"]:
if c["key"] == conv_id and not c.get("label"):
c["label"] = message[:30]
break
ctx["history"].append({"role": "user", "content": message})
yield { msg: gr.update(value=""), chatbot: gr.update(value=ctx["history"]), state: gr.update(value=state_value), conv_choice: _conv_choices(state_value), send_btn: gr.update(visible=False), stop_btn: gr.update(visible=True)}
# Build display as separate titled messages (smolagents style)
display_messages: list[dict] = list(ctx["history"])
text_msg_idx: int | None = None
thinking_msg_idx: int | None = None
tool_call_idx: int | None = None
spinner_frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
spinner_idx = 0
try:
for ev in agent.stream(ctx["history"]):
t = ev["type"]
if t == "reasoning":
spinner_idx += 1
content = f"<span class=\"thinking-indicator\">{spinner_frames[spinner_idx % len(spinner_frames)]} Thinking...</span>"
if thinking_msg_idx is not None:
display_messages[thinking_msg_idx]["content"] = content
else:
display_messages.append({"role": "assistant", "content": content, "metadata": {}})
thinking_msg_idx = len(display_messages) - 1
elif t == "text":
# Remove spinner message if showing — it was a separate bubble
if thinking_msg_idx is not None:
display_messages.pop(thinking_msg_idx)
thinking_msg_idx = None
# Re-index since we popped
if text_msg_idx is not None and thinking_msg_idx is not None:
if text_msg_idx > thinking_msg_idx:
text_msg_idx -= 1
if text_msg_idx is not None:
display_messages[text_msg_idx]["content"] += ev["content"]
else:
display_messages.append({"role": "assistant", "content": ev["content"], "metadata": {}})
text_msg_idx = len(display_messages) - 1
elif t == "tool_call":
# Remove spinner if present
if thinking_msg_idx is not None:
display_messages.pop(thinking_msg_idx)
if text_msg_idx is not None and text_msg_idx > thinking_msg_idx:
text_msg_idx -= 1
thinking_msg_idx = None
# Finalize any in-flight text message (keep if it has real content)
if text_msg_idx is not None:
c = display_messages[text_msg_idx].get("content", "").strip()
if not c:
display_messages.pop(text_msg_idx)
text_msg_idx = None
tool_call_idx = None
tool_name = ev["name"]
display_messages.append({
"role": "assistant",
"content": f"```\n{tool_name}({ev['arguments']})\n```\n⏳ Running...",
"metadata": {"title": f"🛠️ Used tool {tool_name}"},
})
tool_call_idx = len(display_messages) - 1
elif t == "tool_output":
if tool_call_idx is not None:
snippet = ev["content"][:500]
cc = snippet if len(ev["content"]) <= 500 else snippet + "\n..."
# Grab the tool name from the existing message
tool_name = display_messages[tool_call_idx]["metadata"]["title"].split("Used tool ")[-1]
display_messages[tool_call_idx]["content"] = (
f"```\n{tool_name}({ev['arguments']})\n```\n\n"
f"**Output:**\n```\n{cc}\n```"
)
display_messages[tool_call_idx]["metadata"] = {
"title": f"🛠️ {tool_name} — {len(ev['content'])} chars",
}
# Only clear tool_call_idx if NOT partial (final output)
if not ev.get("partial"):
tool_call_idx = None
elif t == "error":
display_messages.append({
"role": "assistant",
"content": f'<span style="color: var(--color-red-500)">{ev["content"]}</span>',
"metadata": {"title": "💥 Error"},
})
yield {
chatbot: gr.update(value=display_messages),
state: gr.update(value=state_value),
send_btn: gr.update(visible=True),
stop_btn: gr.update(visible=False),
}
return
elif t == "done":
break
# Update history in real-time so state always has latest messages
ctx["history"] = display_messages
yield {
chatbot: gr.update(value=display_messages),
state: gr.update(value=state_value),
}
# Final sync (ensures tool output messages are saved)
ctx["history"] = display_messages
yield {
chatbot: gr.update(value=ctx["history"]),
state: gr.update(value=state_value),
send_btn: gr.update(visible=True),
stop_btn: gr.update(visible=False),
}
except Exception as exc:
display_messages.append({
"role": "assistant",
"content": f'<span style="color: var(--color-red-400)">{exc}</span>',
"metadata": {"title": "💥 Error"},
})
ctx["history"] = display_messages
yield {
chatbot: gr.update(value=display_messages),
state: gr.update(value=state_value),
send_btn: gr.update(visible=True),
stop_btn: gr.update(visible=False),
}
@staticmethod
def new_chat(state_value):
state_value["conversation_id"] = ""
return (
gr.update(value=None),
gr.update(value=None),
gr.update(value=state_value),
)
@staticmethod
def select_conversation(choice, state_value):
if not choice:
return gr.skip()
if choice == state_value.get("conversation_id"):
return gr.skip()
state_value["conversation_id"] = choice
ctx = state_value["conversation_contexts"].get(choice, {})
return (
gr.update(value=ctx.get("history", [])),
gr.update(value=state_value),
)
@staticmethod
def delete_selected_conversation(choice, state_value):
if not choice:
return gr.skip()
state_value["conversation_contexts"].pop(choice, None)
state_value["conversations"] = [
c for c in state_value["conversations"] if c["key"] != choice
]
was_active = state_value.get("conversation_id") == choice
if was_active:
state_value["conversation_id"] = ""
return (
_conv_choices(state_value),
gr.update(value=None),
gr.update(value=state_value),
)
return (
_conv_choices(state_value),
gr.skip(),
gr.update(value=state_value),
)
@staticmethod
def cancel_stream(state_value):
"""Mark the current assistant message as cancelled."""
if not state_value.get("conversation_id"):
return gr.skip()
ctx = state_value["conversation_contexts"][
state_value["conversation_id"]]
if ctx.get("history") and ctx["history"][-1].get("role") == "assistant":
ctx["history"][-1]["metadata"] = ctx["history"][-1].get(
"metadata", {})
ctx["history"][-1]["metadata"]["footer"] = "Chat completion paused"
return (gr.update(value=ctx.get("history", [])), gr.update(value=state_value), gr.update(visible=True), gr.update(visible=False))
@staticmethod
def load_from_js(serialised_json, state_value):
"""Receive the JSON string that JS_LOAD_STATE returned, merge into state."""
import json
if not serialised_json:
return gr.skip(), gr.skip()
try:
loaded = json.loads(serialised_json)
except Exception:
return gr.skip(), gr.skip()
state_value["conversations"] = loaded.get("conversations", [])
state_value["conversation_contexts"] = loaded.get("conversation_contexts", {})
return _conv_choices(state_value), gr.update(value=state_value)
@staticmethod
def prepare_save(state_value):
"""Serialise state to JSON so JS_SAVE_STATE can write it to localStorage."""
import json
return json.dumps({
"conversations": state_value.get("conversations", []),
"conversation_contexts": state_value.get("conversation_contexts", {}),
})
with gr.Blocks(fill_width=True, title="OpenMythos Demo") as demo:
state = gr.State({
"conversation_contexts": {},
"conversations": [],
"conversation_id": "",
})
js_load_output = gr.Textbox(visible=False, elem_id="js-load-output")
js_save_input = gr.Textbox(visible=False, elem_id="js-save-input")
with gr.Row(elem_id="main-row"):
with gr.Sidebar(open=False):
new_chat_btn = gr.Button(
value="New Conversation",
variant="primary",
)
conv_choice = gr.Radio(
choices=[],
label=None,
interactive=True,
elem_id="conversations-radio",
)
delete_btn = gr.Button(
value="Delete Selected",
variant="stop",
visible=False
)
with gr.Column(elem_id="chat-column"):
# Landing page shown when chat is empty with added hyperlinks
landing_page = gr.HTML(
value="""
<div id="landing-page">
<div class="landing-content">
<div class="landing-logo">
<img src="/gradio_api/file=static/svg/logo.svg" alt="MythosHarness" width="420" height="70" />
</div>
</div>
<div class="landing-prompt">
<p>Made with ❤️ by <a href="http://huggingface.co/KingNish" target="_blank" style="color: var(--primary-500, #ff4b4b); text-decoration: underline;">KingNish</a> and <a href="https://huggingface.co/himanshu17HF" target="_blank" style="color: var(--primary-500, #ff4b4b); text-decoration: underline;">Himanshu</a></p>
</div>
</div>
""",
elem_id="landing-page-container",
)
chatbot = gr.Chatbot(
elem_id="chatbot",
show_label=False,
buttons=[],
layout="bubble",
autoscroll=True
)
with gr.Row(elem_id="input-row"):
msg = gr.Textbox(
placeholder="Enter your message here...",
show_label=False,
scale=4,
container=False,
max_lines=10,
)
send_btn = gr.Button(
"Send",
variant="primary",
scale=0,
min_width=40,
elem_id="send-btn",
)
stop_btn = gr.Button(
"Stop",
variant="stop",
scale=0,
min_width=40,
visible=False,
elem_id="stop-btn",
)
new_chat_btn.click(
fn=GradioEvents.new_chat,
inputs=[state],
outputs=[conv_choice, chatbot, state],
)
conv_choice.change(
fn=GradioEvents.select_conversation,
inputs=[conv_choice, state],
outputs=[chatbot, state],
)
delete_btn.click(
fn=GradioEvents.delete_selected_conversation,
inputs=[conv_choice, state],
outputs=[conv_choice, chatbot, state],
)
submit_event = send_btn.click(
fn=GradioEvents.stream_response,
inputs=[msg, state],
outputs=[msg, chatbot, state, conv_choice, send_btn, stop_btn],
)
msg.submit(
fn=GradioEvents.stream_response,
inputs=[msg, state],
outputs=[msg, chatbot, state, conv_choice, send_btn, stop_btn],
)
stop_btn.click(
fn=GradioEvents.cancel_stream,
inputs=[state],
outputs=[chatbot, state, send_btn, stop_btn],
cancels=[submit_event],
)
state.change(
fn=GradioEvents.prepare_save,
inputs=[state],
outputs=[js_save_input],
).then(
fn=lambda x: x,
inputs=[js_save_input],
outputs=[js_save_input],
js=JS_SAVE_STATE,
)
demo.load(
fn=lambda: None,
inputs=None,
outputs=None,
js=LANDING_PAGE_SCRIPT,
).then(
fn=lambda x: x,
inputs=[js_load_output],
outputs=[js_load_output],
js=JS_LOAD_STATE,
).then(
fn=GradioEvents.load_from_js,
inputs=[js_load_output, state],
outputs=[conv_choice, state],
)
theme = gr.themes.Base(radius_size="none")
if __name__ == "__main__":
demo.queue(default_concurrency_limit=100, max_size=100).launch(ssr_mode=False, max_threads=100, css_paths="app.css", theme=theme) |