File size: 4,503 Bytes
866d1e1
 
 
 
 
37f1fe0
 
 
866d1e1
 
 
 
 
 
52d5899
 
 
866d1e1
37f1fe0
866d1e1
 
 
 
37f1fe0
 
866d1e1
37f1fe0
 
866d1e1
 
 
 
 
37f1fe0
 
 
 
 
 
 
 
866d1e1
 
 
 
37f1fe0
866d1e1
 
37f1fe0
 
866d1e1
37f1fe0
866d1e1
 
37f1fe0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
866d1e1
 
37f1fe0
866d1e1
37f1fe0
 
 
 
866d1e1
 
 
37f1fe0
866d1e1
 
37f1fe0
 
 
e00fc58
37f1fe0
e00fc58
37f1fe0
e00fc58
37f1fe0
 
 
 
e00fc58
 
37f1fe0
 
 
 
 
 
 
 
 
e00fc58
 
a1cde8b
 
 
e00fc58
 
37f1fe0
e00fc58
37f1fe0
 
a1cde8b
e00fc58
 
37f1fe0
e00fc58
 
 
37f1fe0
e00fc58
37f1fe0
 
e00fc58
 
 
a1cde8b
e00fc58
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python
"""
Parrot OSINT MCP – Gradio Frontend

Modes:
- OSINT Dashboard (deterministic intelligence)
- MCP Bridge (raw tool access)
- Analyst Copilot (LLM interpretive intelligence)
"""

import json
import traceback
from typing import Any, Dict

import gradio as gr
from huggingface_hub import InferenceClient

# ---------------------------------------------------------------------
# Task Registry (auto-loads your MCP tasks)
# ---------------------------------------------------------------------

TASK_REGISTRY: Dict[str, Any] = {}

def _register_tasks():
    def _try(name, module):
        try:
            m = __import__(f"tasks.{module}", fromlist=["*"])
            fn = getattr(m, "run", None)
            if callable(fn):
                TASK_REGISTRY[name] = fn
        except Exception:
            pass

    _try("lookup_ip", "lookup_ip")
    _try("lookup_domain", "lookup_domain")
    _try("lookup_hash", "lookup_hash")
    _try("correlate_iocs", "correlate_iocs")
    _try("generate_report", "generate_report")
    _try("enrich_entity", "enrich_entity")
    _try("mitre_map", "mitre_map")
    _try("quickscan", "quickscan")

_register_tasks()

# ---------------------------------------------------------------------
# Core Task Execution
# ---------------------------------------------------------------------

def call_task(name: str, payload: Dict[str, Any]):
    fn = TASK_REGISTRY.get(name)
    if not fn:
        return {"error": f"Unknown tool '{name}'."}

    try:
        res = fn(**payload)
        if not isinstance(res, dict):
            res = {"result": res}
        return res
    except Exception as e:
        return {"error": str(e), "traceback": traceback.format_exc()}


def normalize_result(res: Dict[str, Any]):
    """Formats UI fields cleanly."""
    pretty = json.dumps(res, indent=2, default=str)
    summary = res.get("summary", "")
    markdown = res.get("markdown") or res.get("report") or ""
    if not markdown and summary:
        markdown = f"## Summary\n\n{summary}"

    return {
        "summary": summary,
        "markdown": markdown,
        "json": pretty,
        "mitre": json.dumps(res.get("mitre", ""), indent=2, default=str) if res.get("mitre") else "",
        "stix": json.dumps(res.get("stix", ""), indent=2, default=str) if res.get("stix") else "",
        "sarif": json.dumps(res.get("sarif", ""), indent=2, default=str) if res.get("sarif") else "",
    }

# ---------------------------------------------------------------------
# ANALYST COPILOT (LLM)
# ---------------------------------------------------------------------

        # -------------------------
        # Analyst Copilot
        # -------------------------
        with gr.Tab("Analyst Copilot"):
            gr.Markdown("### WhiteRabbit Neo + Cybertron TI Assistant")

            system_prompt = gr.Textbox(
                label="System Prompt",
                value=(
                    "You are a threat intelligence analyst. "
                    "You classify TTPs, map MITRE ATT&CK, and provide investigation guidance."
                ),
            )

            model_select = gr.Dropdown(
                label="LLM Model",
                choices=[
                    "berkeley-nest/WhiteRabbitNeo-8B",
                    "cybertronai/cybertron-1.1-1b",
                    "cybertronai/cybertron-1.1-7b",
                    "cybertronai/cybertron-1.1-32b"
                ],
                value="berkeley-nest/WhiteRabbitNeo-8B",
            )

            gr.Markdown("### Provide your HuggingFace API Token for the LLM:")
            hf_token = gr.OAuthToken()

            chatbot = gr.ChatInterface(
                respond,
                type="messages",
                additional_inputs=[
                    system_prompt,
                    model_select,
                    hf_token,
                    gr.Slider(0.1, 2.0, value=0.7, step=0.1, label="Temperature"),
                    gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p"),
                    gr.Slider(32, 4096, value=512, step=32, label="Max Tokens"),
                ],
            )

            inject_btn = gr.Button("Inject Last OSINT Result into Chat")
            inject_btn.click(
                inject_osint,
                inputs=[chatbot._chatbot_state, osint_state],
                outputs=[chatbot._chatbot_state],
            )

            return demo


if __name__ == "__main__":
    demo = build_interface()
    demo.launch()