File size: 18,009 Bytes
0f2dc9a
 
 
 
72cc67e
0f2dc9a
496fee6
0f2dc9a
 
496fee6
0f2dc9a
a991154
 
 
 
 
69cdfce
 
e398a68
 
 
 
 
 
 
 
a991154
 
 
 
 
 
 
 
 
 
e3b02b7
0f2dc9a
e3b02b7
 
a991154
0f2dc9a
72cc67e
2407c30
72cc67e
0f2dc9a
496fee6
 
 
 
 
8dc3647
0f2dc9a
 
8dc3647
0f2dc9a
8dc3647
2cc57f1
0f2dc9a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8dc3647
0f2dc9a
 
 
 
 
71623d4
0f2dc9a
8dc3647
 
 
 
 
0f2dc9a
 
 
 
 
 
 
 
 
 
 
 
714c652
 
 
fa6811f
714c652
95b1c76
 
0f2dc9a
 
e3b02b7
 
0f2dc9a
e3b02b7
95b1c76
714c652
fa6811f
 
e3b02b7
714c652
fa6811f
e3b02b7
 
fa6811f
 
 
 
 
 
 
 
 
714c652
 
 
 
 
e3b02b7
 
fa6811f
 
 
 
 
 
 
714c652
 
 
fa6811f
714c652
 
 
 
 
 
 
 
 
 
 
e3b02b7
 
714c652
 
 
 
 
 
10fbb32
714c652
 
 
 
 
10fbb32
 
 
e3b02b7
 
714c652
0f2dc9a
e3b02b7
714c652
e3b02b7
 
714c652
e3b02b7
 
 
 
 
 
 
 
0f2dc9a
a765a9d
 
 
0f2dc9a
714c652
0f2dc9a
 
 
a765a9d
714c652
 
0f2dc9a
 
 
 
 
 
 
a6837f3
714c652
0f2dc9a
a991154
714c652
0f2dc9a
a765a9d
0f2dc9a
714c652
0f2dc9a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2cc57f1
0f2dc9a
 
2cc57f1
 
0f2dc9a
 
 
 
 
 
 
 
 
 
2cc57f1
0f2dc9a
2cc57f1
0f2dc9a
2cc57f1
0f2dc9a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71623d4
0f2dc9a
 
71623d4
 
 
 
0f2dc9a
71623d4
 
 
 
 
 
 
0f2dc9a
 
71623d4
 
 
 
 
 
 
 
 
d8c251e
0f2dc9a
 
 
 
 
 
71623d4
 
 
0f2dc9a
95b1c76
0f2dc9a
 
 
 
 
 
ab7e40e
0f2dc9a
 
 
 
 
 
ab7e40e
0f2dc9a
 
2cc57f1
0f2dc9a
a6eb102
8cd17a1
 
 
 
 
95b1c76
8cd17a1
 
8dc3647
a6eb102
8dc3647
8cd17a1
 
 
 
 
0f2dc9a
 
 
a6837f3
10fbb32
d8c251e
0f2dc9a
ab7e40e
0f2dc9a
8cd17a1
0f2dc9a
 
 
8cd17a1
0f2dc9a
 
 
 
 
2cc57f1
 
0f2dc9a
 
 
 
 
2cc57f1
0f2dc9a
2cc57f1
0f2dc9a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71623d4
0f2dc9a
71623d4
 
 
 
 
 
0f2dc9a
71623d4
0f2dc9a
8dc3647
 
 
 
 
71623d4
 
 
 
 
 
 
0f2dc9a
 
 
 
 
 
a6eb102
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
import time
import uuid
import gradio as gr
from dotenv import load_dotenv
from agent import Agent, FETCH_WEBPAGE_TOOL, SHELL_TOOL, READ_TOOL, FINAL_MESSAGE_TOOL
import os
from pathlib import Path

load_dotenv()
gr.set_static_paths("static/")

_SYSTEM_PROMPT = """\
You are OpenMythos, a powerful AI agent specialized in cybersecurity-related tasks.

You have access to tools that you can use to accomplish your goals.

You are a multi-level vulnerability analysis, a visual dependency risk path, a declared threat level then generates an instant, verifiable hotfix patch before threat actors can exploit it.

When finding exploits list it in multi step use tools to search for something specific if needed.

After finding one bulnerability, you will generate a patch for it and provide a detailed explanation of the vulnerability, including its potential impact and how the patch mitigates the risk.

Than THinks again and search for new vulnerabilities and repeat the process until all vulnerabilities are found.

Don't go much looped if you find yourself in a loop just call the `final_message` tool to end the conversation.

=== IMPORTANT: How to end the conversation ===
You MUST call the `final_message` tool when you have completed your response and want to end.
If you do NOT call `final_message`, you will be stuck in a loop:
  - You respond → system waits for final_message → you did not call it
  - → system sends your response back to you → you must respond again
  - → this repeats until you call `final_message`
To break out of the loop, simply call `final_message` with no arguments.
Only call `final_message` when you are done or already responded or stuck in a loop.
"""

agent = Agent(
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    model=os.getenv("OPENAI_MODEL"),
    system_prompt=_SYSTEM_PROMPT,
)
agent.register_tool(FETCH_WEBPAGE_TOOL, SHELL_TOOL, READ_TOOL, FINAL_MESSAGE_TOOL)
agent.register_all_mcp()
agent.set_final_message_tool()

# Load JS from external files
_js_dir = Path(__file__).parent / "static" / "js"
JS_LOAD_STATE = (_js_dir / "storage.load.js").read_text()
JS_SAVE_STATE = (_js_dir / "storage.save.js").read_text()
LANDING_PAGE_SCRIPT = (_js_dir / "landing.js").read_text()


def _conv_choices(state_value):
    convs = sorted(state_value["conversations"], key=lambda c: c.get("last_updated", 0), reverse=True)
    return gr.update(
        choices=[(c["label"], c["key"]) for c in convs],
        value=state_value.get("conversation_id") or None,
    )


class GradioEvents:
    """Event handlers for the chatbot UI."""

    @staticmethod
    def stream_response(message, state_value):
        """Stream a chat completion into the active conversation."""
        if not message or not message.strip():
            yield gr.skip()
            return

        if not state_value.get("conversation_id"):
            conv_id = str(uuid.uuid4())
            state_value["conversation_id"] = conv_id
            state_value["conversations"].append(
                {"label": message[:30], "key": conv_id, "last_updated": int(time.time() * 1000)})
            state_value["conversation_contexts"][conv_id] = {
                "history": []
            }
        else:
            conv_id = state_value["conversation_id"]
            state_value["conversation_contexts"].setdefault(
                conv_id, {"history": []})
            # Update last_updated for existing conversation
            for c in state_value["conversations"]:
                if c["key"] == conv_id:
                    c["last_updated"] = int(time.time() * 1000)
                    break

        ctx = state_value["conversation_contexts"][conv_id]

        for c in state_value["conversations"]:
            if c["key"] == conv_id and not c.get("label"):
                c["label"] = message[:30]
                break

        ctx["history"].append({"role": "user", "content": message})

        yield { msg: gr.update(value=""), chatbot: gr.update(value=ctx["history"]), state: gr.update(value=state_value), conv_choice: _conv_choices(state_value), send_btn: gr.update(visible=False), stop_btn: gr.update(visible=True)}

        # Build display as separate titled messages (smolagents style)
        display_messages: list[dict] = list(ctx["history"])
        text_msg_idx: int | None = None
        thinking_msg_idx: int | None = None
        tool_call_idx: int | None = None
        spinner_frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
        spinner_idx = 0

        try:
            for ev in agent.stream(ctx["history"]):
                t = ev["type"]

                if t == "reasoning":
                    spinner_idx += 1
                    content = f"<span class=\"thinking-indicator\">{spinner_frames[spinner_idx % len(spinner_frames)]}  Thinking...</span>"
                    if thinking_msg_idx is not None:
                        display_messages[thinking_msg_idx]["content"] = content
                    else:
                        display_messages.append({"role": "assistant", "content": content, "metadata": {}})
                        thinking_msg_idx = len(display_messages) - 1

                elif t == "text":
                    # Remove spinner message if showing — it was a separate bubble
                    if thinking_msg_idx is not None:
                        display_messages.pop(thinking_msg_idx)
                        thinking_msg_idx = None
                        # Re-index since we popped
                        if text_msg_idx is not None and thinking_msg_idx is not None:
                            if  text_msg_idx > thinking_msg_idx:
                                text_msg_idx -= 1

                    if text_msg_idx is not None:
                        display_messages[text_msg_idx]["content"] += ev["content"]
                    else:
                        display_messages.append({"role": "assistant", "content": ev["content"], "metadata": {}})
                        text_msg_idx = len(display_messages) - 1

                elif t == "tool_call":
                    # Remove spinner if present
                    if thinking_msg_idx is not None:
                        display_messages.pop(thinking_msg_idx)
                        if text_msg_idx is not None and text_msg_idx > thinking_msg_idx:
                            text_msg_idx -= 1
                        thinking_msg_idx = None

                    # Finalize any in-flight text message (keep if it has real content)
                    if text_msg_idx is not None:
                        c = display_messages[text_msg_idx].get("content", "").strip()
                        if not c:
                            display_messages.pop(text_msg_idx)
                        text_msg_idx = None
                    tool_call_idx = None

                    tool_name = ev["name"]
                    display_messages.append({
                        "role": "assistant",
                        "content": f"```\n{tool_name}({ev['arguments']})\n```\n⏳ Running...",
                        "metadata": {"title": f"🛠️ Used tool {tool_name}"},
                    })
                    tool_call_idx = len(display_messages) - 1

                elif t == "tool_output":
                    if tool_call_idx is not None:
                        snippet = ev["content"][:500]
                        cc = snippet if len(ev["content"]) <= 500 else snippet + "\n..."
                        # Grab the tool name from the existing message
                        tool_name = display_messages[tool_call_idx]["metadata"]["title"].split("Used tool ")[-1]
                        display_messages[tool_call_idx]["content"] = (
                            f"```\n{tool_name}({ev['arguments']})\n```\n\n"
                            f"**Output:**\n```\n{cc}\n```"
                        )
                        display_messages[tool_call_idx]["metadata"] = {
                            "title": f"🛠️ {tool_name}{len(ev['content'])} chars",
                        }
                        # Only clear tool_call_idx if NOT partial (final output)
                        if not ev.get("partial"):
                            tool_call_idx = None

                elif t == "error":
                    display_messages.append({
                        "role": "assistant",
                        "content": f'<span style="color: var(--color-red-500)">{ev["content"]}</span>',
                        "metadata": {"title": "💥 Error"},
                    })
                    yield {
                        chatbot: gr.update(value=display_messages),
                        state: gr.update(value=state_value),
                        send_btn: gr.update(visible=True),
                        stop_btn: gr.update(visible=False),
                    }
                    return

                elif t == "done":
                    break

                # Update history in real-time so state always has latest messages
                ctx["history"] = display_messages

                yield {
                    chatbot: gr.update(value=display_messages),
                    state: gr.update(value=state_value),
                }

            # Final sync (ensures tool output messages are saved)
            ctx["history"] = display_messages

            yield {
                chatbot: gr.update(value=ctx["history"]),
                state: gr.update(value=state_value),
                send_btn: gr.update(visible=True),
                stop_btn: gr.update(visible=False),
            }

        except Exception as exc:
            display_messages.append({
                "role": "assistant",
                "content": f'<span style="color: var(--color-red-400)">{exc}</span>',
                "metadata": {"title": "💥 Error"},
            })
            ctx["history"] = display_messages
            yield {
                chatbot: gr.update(value=display_messages),
                state: gr.update(value=state_value),
                send_btn: gr.update(visible=True),
                stop_btn: gr.update(visible=False),
            }

    @staticmethod
    def new_chat(state_value):
        state_value["conversation_id"] = ""
        return (
            gr.update(value=None),
            gr.update(value=None),
            gr.update(value=state_value),
        )

    @staticmethod
    def select_conversation(choice, state_value):
        if not choice:
            return gr.skip()
        if choice == state_value.get("conversation_id"):
            return gr.skip()

        state_value["conversation_id"] = choice
        ctx = state_value["conversation_contexts"].get(choice, {})
        return (
            gr.update(value=ctx.get("history", [])),
            gr.update(value=state_value),
        )

    @staticmethod
    def delete_selected_conversation(choice, state_value):
        if not choice:
            return gr.skip()

        state_value["conversation_contexts"].pop(choice, None)
        state_value["conversations"] = [
            c for c in state_value["conversations"] if c["key"] != choice
        ]
        was_active = state_value.get("conversation_id") == choice
        if was_active:
            state_value["conversation_id"] = ""
            return (
                _conv_choices(state_value),
                gr.update(value=None),
                gr.update(value=state_value),
            )
        return (
            _conv_choices(state_value),
            gr.skip(),
            gr.update(value=state_value),
        )

    @staticmethod
    def cancel_stream(state_value):
        """Mark the current assistant message as cancelled."""
        if not state_value.get("conversation_id"):
            return gr.skip()
        ctx = state_value["conversation_contexts"][
            state_value["conversation_id"]]
        if ctx.get("history") and ctx["history"][-1].get("role") == "assistant":
            ctx["history"][-1]["metadata"] = ctx["history"][-1].get(
                "metadata", {})
            ctx["history"][-1]["metadata"]["footer"] = "Chat completion paused"
        return (gr.update(value=ctx.get("history", [])), gr.update(value=state_value), gr.update(visible=True), gr.update(visible=False))

    @staticmethod
    def load_from_js(serialised_json, state_value):
        """Receive the JSON string that JS_LOAD_STATE returned, merge into state."""
        import json
        if not serialised_json:
            return gr.skip(), gr.skip()
        try:
            loaded = json.loads(serialised_json)
        except Exception:
            return gr.skip(), gr.skip()

        state_value["conversations"] = loaded.get("conversations", [])
        state_value["conversation_contexts"] = loaded.get("conversation_contexts", {})
        return _conv_choices(state_value), gr.update(value=state_value)

    @staticmethod
    def prepare_save(state_value):
        """Serialise state to JSON so JS_SAVE_STATE can write it to localStorage."""
        import json
        return json.dumps({
            "conversations": state_value.get("conversations", []),
            "conversation_contexts": state_value.get("conversation_contexts", {}),
        })

with gr.Blocks(fill_width=True, title="OpenMythos Demo") as demo:
    state = gr.State({
        "conversation_contexts": {},
        "conversations": [],
        "conversation_id": "",
    })

    js_load_output = gr.Textbox(visible=False, elem_id="js-load-output")
    js_save_input  = gr.Textbox(visible=False, elem_id="js-save-input")

    with gr.Row(elem_id="main-row"):
        with gr.Sidebar(open=False):
            new_chat_btn = gr.Button(
                value="New Conversation",
                variant="primary",
            )
            conv_choice = gr.Radio(
                choices=[],
                label=None,
                interactive=True,
                elem_id="conversations-radio",
            )
            delete_btn = gr.Button(
                value="Delete Selected",
                variant="stop",
                visible=False
            )

        with gr.Column(elem_id="chat-column"):

            # Landing page shown when chat is empty with added hyperlinks
            landing_page = gr.HTML(
                value="""
                <div id="landing-page">
                    <div class="landing-content">
                        <div class="landing-logo">
                            <img src="/gradio_api/file=static/svg/logo.svg" alt="MythosHarness" width="420" height="70" />
                        </div>
                    </div>
                    <div class="landing-prompt">
                        <p>Made with ❤️ by <a href="http://huggingface.co/KingNish" target="_blank" style="color: var(--primary-500, #ff4b4b); text-decoration: underline;">KingNish</a> and <a href="https://huggingface.co/himanshu17HF" target="_blank" style="color: var(--primary-500, #ff4b4b); text-decoration: underline;">Himanshu</a></p>
                    </div>
                </div>
                """,
                elem_id="landing-page-container",
            )

            chatbot = gr.Chatbot(
                elem_id="chatbot",
                show_label=False,
                buttons=[],
                layout="bubble",
                autoscroll=True
            )
            with gr.Row(elem_id="input-row"):
                msg = gr.Textbox(
                    placeholder="Enter your message here...",
                    show_label=False,
                    scale=4,
                    container=False,
                    max_lines=10,
                )
                send_btn = gr.Button(
                    "Send",
                    variant="primary",
                    scale=0,
                    min_width=40,
                    elem_id="send-btn",
                )
                stop_btn = gr.Button(
                    "Stop",
                    variant="stop",
                    scale=0,
                    min_width=40,
                    visible=False,
                    elem_id="stop-btn",
                )

    new_chat_btn.click(
        fn=GradioEvents.new_chat,
        inputs=[state],
        outputs=[conv_choice, chatbot, state],
    )

    conv_choice.change(
        fn=GradioEvents.select_conversation,
        inputs=[conv_choice, state],
        outputs=[chatbot, state],
    )

    delete_btn.click(
        fn=GradioEvents.delete_selected_conversation,
        inputs=[conv_choice, state],
        outputs=[conv_choice, chatbot, state],
    )

    submit_event = send_btn.click(
        fn=GradioEvents.stream_response,
        inputs=[msg, state],
        outputs=[msg, chatbot, state, conv_choice, send_btn, stop_btn],
    )
    msg.submit(
        fn=GradioEvents.stream_response,
        inputs=[msg, state],
        outputs=[msg, chatbot, state, conv_choice, send_btn, stop_btn],
    )

    stop_btn.click(
        fn=GradioEvents.cancel_stream,
        inputs=[state],
        outputs=[chatbot, state, send_btn, stop_btn],
        cancels=[submit_event],
    )

    state.change(
        fn=GradioEvents.prepare_save,
        inputs=[state],
        outputs=[js_save_input],
    ).then(
        fn=lambda x: x,
        inputs=[js_save_input],
        outputs=[js_save_input],
        js=JS_SAVE_STATE,
    )

    demo.load(
        fn=lambda: None,
        inputs=None,
        outputs=None,
        js=LANDING_PAGE_SCRIPT,
    ).then(
        fn=lambda x: x,
        inputs=[js_load_output],
        outputs=[js_load_output],
        js=JS_LOAD_STATE,
    ).then(
        fn=GradioEvents.load_from_js,
        inputs=[js_load_output, state],
        outputs=[conv_choice, state],
    )

theme = gr.themes.Base(radius_size="none")

if __name__ == "__main__":
    demo.queue(default_concurrency_limit=100, max_size=100).launch(ssr_mode=False, max_threads=100, css_paths="app.css", theme=theme)