S-Dreamer commited on
Commit
37f1fe0
·
verified ·
1 Parent(s): 1005311

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +137 -162
app.py CHANGED
@@ -3,9 +3,9 @@
3
  Parrot OSINT MCP – Gradio Frontend
4
 
5
  Modes:
6
- - "OSINT Dashboard" (multi-tool, opinionated)
7
- - "MCP Bridge" (raw tool_name + JSON args → JSON result)
8
- - "Analyst Copilot" (streaming LLM with OSINT context injection)
9
  """
10
 
11
  import json
@@ -16,263 +16,238 @@ import gradio as gr
16
  from huggingface_hub import InferenceClient
17
 
18
  # ---------------------------------------------------------------------
19
- # Task registry: adapt this to your actual task API
20
  # ---------------------------------------------------------------------
21
 
22
  TASK_REGISTRY: Dict[str, Any] = {}
23
 
24
- def _register_tasks() -> None:
25
- def _try_register(name: str, module_name: str):
26
  try:
27
- module = __import__(f"tasks.{module_name}", fromlist=["*"])
28
- fn = getattr(module, "run", None)
29
  if callable(fn):
30
  TASK_REGISTRY[name] = fn
31
  except Exception:
32
  pass
33
 
34
- _try_register("lookup_ip", "lookup_ip")
35
- _try_register("lookup_domain", "lookup_domain")
36
- _try_register("lookup_hash", "lookup_hash")
37
- _try_register("correlate_iocs", "correlate_iocs")
38
- _try_register("generate_report", "generate_report")
39
- _try_register("enrich_entity", "enrich_entity")
40
- _try_register("mitre_map", "mitre_map")
41
- _try_register("quickscan", "quickscan")
42
 
43
  _register_tasks()
44
 
45
-
46
  # ---------------------------------------------------------------------
47
- # Core execution helpers
48
  # ---------------------------------------------------------------------
49
 
50
- def call_task(tool_name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
51
- fn = TASK_REGISTRY.get(tool_name)
52
  if not fn:
53
- return {
54
- "error": f"Unknown tool '{tool_name}'. Registered tools: {sorted(TASK_REGISTRY.keys())}"
55
- }
56
 
57
  try:
58
- result = fn(**payload)
59
- if not isinstance(result, dict):
60
- result = {"result": result}
61
- return result
62
- except Exception as exc:
63
- return {
64
- "error": f"Exception in tool '{tool_name}': {exc}",
65
- "traceback": traceback.format_exc(),
66
- }
67
-
68
-
69
- def format_result_for_ui(result: Dict[str, Any]) -> Dict[str, str]:
70
- pretty_json = json.dumps(result, indent=2, default=str)
71
-
72
- markdown = result.get("markdown") or result.get("report") or ""
73
- if not markdown and "summary" in result:
74
- markdown = f"## Summary\n\n{result['summary']}"
75
-
76
- mitre = json.dumps(result.get("mitre", ""), indent=2, default=str) if result.get("mitre") else ""
77
- stix = json.dumps(result.get("stix", ""), indent=2, default=str) if result.get("stix") else ""
78
- sarif = json.dumps(result.get("sarif", ""), indent=2, default=str) if result.get("sarif") else ""
79
 
80
  return {
81
- "summary": result.get("summary", ""),
82
  "markdown": markdown,
83
- "json": pretty_json,
84
- "mitre": mitre,
85
- "stix": stix,
86
- "sarif": sarif,
87
  }
88
 
89
-
90
  # ---------------------------------------------------------------------
91
- # MODE C — ANALYST COPILOT (LLM)
92
  # ---------------------------------------------------------------------
93
 
94
- def respond(message, history, system_message, model, hf_token, temperature, top_p, max_tokens):
95
- """
96
- Streaming LLM response using HuggingFace InferenceClient.
97
- Supports injecting OSINT task results into the conversation.
98
- """
99
- client = InferenceClient(
100
- token=hf_token.token,
101
- model=model,
102
- )
103
-
104
- messages = [{"role": "system", "content": system_message}]
105
- messages.extend(history)
106
- messages.append({"role": "user", "content": message})
107
-
108
- response_text = ""
109
-
 
 
110
  for chunk in client.chat_completion(
111
- messages=messages,
 
112
  temperature=temperature,
113
  top_p=top_p,
114
- max_tokens=max_tokens,
115
- stream=True
116
  ):
117
  delta = chunk.choices[0].delta.content
118
  if delta:
119
- response_text += delta
120
- yield response_text
121
-
122
-
123
- def inject_osint_context(history, task_result: Dict[str, Any]):
124
- """
125
- Inject JSON + summary + MITRE mappings directly into the chat history.
126
- """
127
- pretty = json.dumps(task_result, indent=2, default=str)
128
- blob = f"""
129
- ### OSINT Result Injected:
130
-
131
- {pretty}
132
-
133
- """
134
-
135
- history.append({"role": "system", "content": blob})
136
  return history
137
 
138
-
139
  # ---------------------------------------------------------------------
140
- # Dashboard callbacks (Mode B)
141
  # ---------------------------------------------------------------------
142
 
143
  def ui_lookup_ip(ip, enrich, mitre):
144
  raw = call_task("lookup_ip", {"ip": ip, "enrich": enrich, "map_mitre": mitre})
145
- normal = format_result_for_ui(raw)
146
- return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], normal["stix"], raw
147
-
148
 
149
  def ui_lookup_domain(domain, enrich, mitre):
150
  raw = call_task("lookup_domain", {"domain": domain, "enrich": enrich, "map_mitre": mitre})
151
- normal = format_result_for_ui(raw)
152
- return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], normal["stix"], raw
153
-
154
 
155
  def ui_lookup_hash(h, ht, enrich, mitre):
156
  raw = call_task("lookup_hash", {"hash": h, "hash_type": ht, "enrich": enrich, "map_mitre": mitre})
157
- normal = format_result_for_ui(raw)
158
- return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], normal["stix"], raw
159
-
160
 
161
  def ui_correlate_iocs(iocs):
162
- parsed = [l.strip() for l in iocs.splitlines() if l.strip()]
163
- raw = call_task("correlate_iocs", {"iocs": parsed})
164
- normal = format_result_for_ui(raw)
165
- return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], raw
166
-
167
 
168
  def ui_quickscan(target):
169
  raw = call_task("quickscan", {"target": target})
170
- normal = format_result_for_ui(raw)
171
- return normal["summary"], normal["markdown"], normal["json"], raw
172
-
173
 
174
  # ---------------------------------------------------------------------
175
- # MCP Bridge (Mode D)
176
  # ---------------------------------------------------------------------
177
 
178
- def ui_mcp_bridge(tool, args_json):
179
  try:
180
  payload = json.loads(args_json)
181
- except Exception as exc:
182
- err = {"error": f"Invalid JSON: {exc}"}
183
- return json.dumps(err, indent=2), "", err
184
-
185
  raw = call_task(tool, payload)
186
- normal = format_result_for_ui(raw)
187
- return normal["json"], normal["markdown"], raw
188
-
189
 
190
  # ---------------------------------------------------------------------
191
- # UI — Now with Analyst Copilot
192
  # ---------------------------------------------------------------------
193
 
194
- def build_interface() -> gr.Blocks:
195
  with gr.Blocks(title="Parrot OSINT MCP Console") as demo:
196
  gr.Markdown("# Parrot OSINT MCP Console")
197
 
198
- # Store OSINT task results for injection into the Copilot
199
- osint_result_state = gr.State([])
200
 
201
- # ------------------------------------------
202
- # MODE B — Dashboard
203
- # ------------------------------------------
204
  with gr.Tab("OSINT Dashboard"):
205
- with gr.Tab("IP Lookup"):
206
  ip = gr.Textbox(label="IP Address")
207
- enrich = gr.Checkbox(value=True, label="Enrichment")
208
- mitre = gr.Checkbox(value=True, label="MITRE mapping")
209
- btn = gr.Button("Run")
210
- summary = gr.Textbox(label="Summary")
 
211
  md = gr.Markdown()
212
  js = gr.Code(language="json")
213
  mt = gr.Code(language="json")
214
  st = gr.Code(language="json")
215
 
216
- btn.click(
217
- ui_lookup_ip,
218
- inputs=[ip, enrich, mitre],
219
- outputs=[summary, md, js, mt, st, osint_result_state],
220
- )
221
 
222
- # You already know: similar tabs for domain, hash, correlation, quickscan
223
- # (keeping focus on Copilot integration)
224
 
225
- # ------------------------------------------
226
- # MODE D — MCP Bridge
227
- # ------------------------------------------
228
  with gr.Tab("MCP Bridge"):
229
  tool = gr.Dropdown(sorted(TASK_REGISTRY.keys()))
230
- args = gr.Code(label="Args JSON")
 
231
  out_js = gr.Code(language="json")
232
  out_md = gr.Markdown()
233
 
234
- bridge_btn = gr.Button("Call Tool")
235
- bridge_btn.click(
236
- ui_mcp_bridge,
237
- inputs=[tool, args],
238
- outputs=[out_js, out_md, osint_result_state],
239
- )
240
 
241
- # ------------------------------------------
242
- # MODE C — Analyst Copilot
243
- # ------------------------------------------
244
  with gr.Tab("Analyst Copilot"):
245
- gr.Markdown("### Streaming TI Assistant with OSINT Context Injection")
246
 
247
- system_msg = gr.Textbox(
248
  label="System Prompt",
249
- value=("You are a threat intelligence analyst. "
250
- "You think slowly, explain clearly, identify TTPs, "
251
- "and recommend next investigative steps."),
 
252
  )
253
 
254
- model = gr.Textbox(
255
- label="HF Model (e.g., openai/gpt-oss-20b)",
256
- value="openai/gpt-oss-20b",
 
 
 
 
 
 
257
  )
258
 
259
  chatbot = gr.ChatInterface(
260
  respond,
 
261
  additional_inputs=[
262
- system_msg,
263
- model,
264
  gr.OAuthToken(label="HF Token"),
265
  gr.Slider(0.1, 2.0, value=0.7, step=0.1, label="Temperature"),
266
  gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p"),
267
- gr.Slider(1, 2048, value=512, step=1, label="Max Tokens"),
268
  ],
269
- type="messages",
270
  )
271
 
272
- inject_btn = gr.Button("Inject Latest OSINT Result")
273
  inject_btn.click(
274
- inject_osint_context,
275
- inputs=[chatbot._chatbot_state, osint_result_state],
276
  outputs=[chatbot._chatbot_state],
277
  )
278
 
 
3
  Parrot OSINT MCP – Gradio Frontend
4
 
5
  Modes:
6
+ - OSINT Dashboard (deterministic intelligence)
7
+ - MCP Bridge (raw tool access)
8
+ - Analyst Copilot (LLM interpretive intelligence)
9
  """
10
 
11
  import json
 
16
  from huggingface_hub import InferenceClient
17
 
18
  # ---------------------------------------------------------------------
19
+ # Task Registry (auto-loads your MCP tasks)
20
  # ---------------------------------------------------------------------
21
 
22
  TASK_REGISTRY: Dict[str, Any] = {}
23
 
24
+ def _register_tasks():
25
+ def _try(name, module):
26
  try:
27
+ m = __import__(f"tasks.{module}", fromlist=["*"])
28
+ fn = getattr(m, "run", None)
29
  if callable(fn):
30
  TASK_REGISTRY[name] = fn
31
  except Exception:
32
  pass
33
 
34
+ _try("lookup_ip", "lookup_ip")
35
+ _try("lookup_domain", "lookup_domain")
36
+ _try("lookup_hash", "lookup_hash")
37
+ _try("correlate_iocs", "correlate_iocs")
38
+ _try("generate_report", "generate_report")
39
+ _try("enrich_entity", "enrich_entity")
40
+ _try("mitre_map", "mitre_map")
41
+ _try("quickscan", "quickscan")
42
 
43
  _register_tasks()
44
 
 
45
  # ---------------------------------------------------------------------
46
+ # Core Task Execution
47
  # ---------------------------------------------------------------------
48
 
49
+ def call_task(name: str, payload: Dict[str, Any]):
50
+ fn = TASK_REGISTRY.get(name)
51
  if not fn:
52
+ return {"error": f"Unknown tool '{name}'."}
 
 
53
 
54
  try:
55
+ res = fn(**payload)
56
+ if not isinstance(res, dict):
57
+ res = {"result": res}
58
+ return res
59
+ except Exception as e:
60
+ return {"error": str(e), "traceback": traceback.format_exc()}
61
+
62
+
63
+ def normalize_result(res: Dict[str, Any]):
64
+ """Formats UI fields cleanly."""
65
+ pretty = json.dumps(res, indent=2, default=str)
66
+ summary = res.get("summary", "")
67
+ markdown = res.get("markdown") or res.get("report") or ""
68
+ if not markdown and summary:
69
+ markdown = f"## Summary\n\n{summary}"
 
 
 
 
 
 
70
 
71
  return {
72
+ "summary": summary,
73
  "markdown": markdown,
74
+ "json": pretty,
75
+ "mitre": json.dumps(res.get("mitre", ""), indent=2, default=str) if res.get("mitre") else "",
76
+ "stix": json.dumps(res.get("stix", ""), indent=2, default=str) if res.get("stix") else "",
77
+ "sarif": json.dumps(res.get("sarif", ""), indent=2, default=str) if res.get("sarif") else "",
78
  }
79
 
 
80
  # ---------------------------------------------------------------------
81
+ # ANALYST COPILOT (LLM)
82
  # ---------------------------------------------------------------------
83
 
84
+ def respond(
85
+ message,
86
+ history,
87
+ system_prompt,
88
+ model_name,
89
+ hf_token,
90
+ temperature,
91
+ top_p,
92
+ max_tokens,
93
+ ):
94
+ """Streaming response from WhiteRabbit Neo or Cybertron."""
95
+ client = InferenceClient(model=model_name, token=hf_token.token)
96
+
97
+ msgs = [{"role": "system", "content": system_prompt}]
98
+ msgs.extend(history)
99
+ msgs.append({"role": "user", "content": message})
100
+
101
+ buf = ""
102
  for chunk in client.chat_completion(
103
+ messages=msgs,
104
+ max_tokens=max_tokens,
105
  temperature=temperature,
106
  top_p=top_p,
107
+ stream=True,
 
108
  ):
109
  delta = chunk.choices[0].delta.content
110
  if delta:
111
+ buf += delta
112
+ yield buf
113
+
114
+ def inject_osint(history, osint_obj):
115
+ """Inject raw JSON results into the chat context."""
116
+ pretty = json.dumps(osint_obj, indent=2, default=str)
117
+ history.append({
118
+ "role": "system",
119
+ "content": f"### Injected OSINT Result\n```\n{pretty}\n```"
120
+ })
 
 
 
 
 
 
 
121
  return history
122
 
 
123
  # ---------------------------------------------------------------------
124
+ # OSINT Dashboard Callbacks
125
  # ---------------------------------------------------------------------
126
 
127
  def ui_lookup_ip(ip, enrich, mitre):
128
  raw = call_task("lookup_ip", {"ip": ip, "enrich": enrich, "map_mitre": mitre})
129
+ norm = normalize_result(raw)
130
+ return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw
 
131
 
132
  def ui_lookup_domain(domain, enrich, mitre):
133
  raw = call_task("lookup_domain", {"domain": domain, "enrich": enrich, "map_mitre": mitre})
134
+ norm = normalize_result(raw)
135
+ return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw
 
136
 
137
  def ui_lookup_hash(h, ht, enrich, mitre):
138
  raw = call_task("lookup_hash", {"hash": h, "hash_type": ht, "enrich": enrich, "map_mitre": mitre})
139
+ norm = normalize_result(raw)
140
+ return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw
 
141
 
142
  def ui_correlate_iocs(iocs):
143
+ lst = [x.strip() for x in iocs.splitlines() if x.strip()]
144
+ raw = call_task("correlate_iocs", {"iocs": lst})
145
+ norm = normalize_result(raw)
146
+ return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], raw
 
147
 
148
  def ui_quickscan(target):
149
  raw = call_task("quickscan", {"target": target})
150
+ norm = normalize_result(raw)
151
+ return norm["summary"], norm["markdown"], norm["json"], raw
 
152
 
153
  # ---------------------------------------------------------------------
154
+ # MCP Bridge
155
  # ---------------------------------------------------------------------
156
 
157
+ def ui_bridge(tool, args_json):
158
  try:
159
  payload = json.loads(args_json)
160
+ except Exception as e:
161
+ return json.dumps({"error": str(e)}, indent=2), "", {}
 
 
162
  raw = call_task(tool, payload)
163
+ norm = normalize_result(raw)
164
+ return norm["json"], norm["markdown"], raw
 
165
 
166
  # ---------------------------------------------------------------------
167
+ # BUILD UI
168
  # ---------------------------------------------------------------------
169
 
170
+ def build_interface():
171
  with gr.Blocks(title="Parrot OSINT MCP Console") as demo:
172
  gr.Markdown("# Parrot OSINT MCP Console")
173
 
174
+ osint_state = gr.State({})
 
175
 
176
+ # -------------------------
177
+ # OSINT Dashboard
178
+ # -------------------------
179
  with gr.Tab("OSINT Dashboard"):
180
+ with gr.Tab("IP"):
181
  ip = gr.Textbox(label="IP Address")
182
+ enrich = gr.Checkbox(value=True, label="Enrich")
183
+ mitre = gr.Checkbox(value=True, label="MITRE Map")
184
+ run = gr.Button("Run IP Lookup")
185
+
186
+ s = gr.Textbox(label="Summary")
187
  md = gr.Markdown()
188
  js = gr.Code(language="json")
189
  mt = gr.Code(language="json")
190
  st = gr.Code(language="json")
191
 
192
+ run.click(ui_lookup_ip, [ip, enrich, mitre], [s, md, js, mt, st, osint_state])
 
 
 
 
193
 
194
+ # Add other tabs (Domain, Hash, etc.)
195
+ # Your earlier implementation plugs in cleanly.
196
 
197
+ # -------------------------
198
+ # MCP Bridge
199
+ # -------------------------
200
  with gr.Tab("MCP Bridge"):
201
  tool = gr.Dropdown(sorted(TASK_REGISTRY.keys()))
202
+ args = gr.Code(language="json")
203
+ btn = gr.Button("Call Tool")
204
  out_js = gr.Code(language="json")
205
  out_md = gr.Markdown()
206
 
207
+ btn.click(ui_bridge, [tool, args], [out_js, out_md, osint_state])
 
 
 
 
 
208
 
209
+ # -------------------------
210
+ # Analyst Copilot
211
+ # -------------------------
212
  with gr.Tab("Analyst Copilot"):
213
+ gr.Markdown("### WhiteRabbit Neo + Cybertron TI Assistant")
214
 
215
+ system_prompt = gr.Textbox(
216
  label="System Prompt",
217
+ value=(
218
+ "You are a threat intelligence analyst. "
219
+ "You classify TTPs, map MITRE ATT&CK, and provide investigation guidance."
220
+ ),
221
  )
222
 
223
+ model_select = gr.Dropdown(
224
+ label="LLM Model",
225
+ choices=[
226
+ "berkeley-nest/WhiteRabbitNeo-8B",
227
+ "cybertronai/cybertron-1.1-1b",
228
+ "cybertronai/cybertron-1.1-7b",
229
+ "cybertronai/cybertron-1.1-32b"
230
+ ],
231
+ value="berkeley-nest/WhiteRabbitNeo-8B",
232
  )
233
 
234
  chatbot = gr.ChatInterface(
235
  respond,
236
+ type="messages",
237
  additional_inputs=[
238
+ system_prompt,
239
+ model_select,
240
  gr.OAuthToken(label="HF Token"),
241
  gr.Slider(0.1, 2.0, value=0.7, step=0.1, label="Temperature"),
242
  gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p"),
243
+ gr.Slider(32, 4096, value=512, step=32, label="Max Tokens"),
244
  ],
 
245
  )
246
 
247
+ inject_btn = gr.Button("Inject Last OSINT Result into Chat")
248
  inject_btn.click(
249
+ inject_osint,
250
+ inputs=[chatbot._chatbot_state, osint_state],
251
  outputs=[chatbot._chatbot_state],
252
  )
253