OSINTMCPServer / app.py
S-Dreamer's picture
Update app.py
e00fc58 verified
raw
history blame
9.86 kB
#!/usr/bin/env python
"""
Parrot OSINT MCP – Gradio Frontend
Modes:
- "OSINT Dashboard" (multi-tool, opinionated)
- "MCP Bridge" (raw tool_name + JSON args β†’ JSON result)
- "Analyst Copilot" (streaming LLM with OSINT context injection)
"""
import json
import traceback
from typing import Any, Dict
import gradio as gr
from huggingface_hub import InferenceClient
# ---------------------------------------------------------------------
# Task registry: adapt this to your actual task API
# ---------------------------------------------------------------------
TASK_REGISTRY: Dict[str, Any] = {}
def _register_tasks() -> None:
def _try_register(name: str, module_name: str):
try:
module = __import__(f"tasks.{module_name}", fromlist=["*"])
fn = getattr(module, "run", None)
if callable(fn):
TASK_REGISTRY[name] = fn
except Exception:
pass
_try_register("lookup_ip", "lookup_ip")
_try_register("lookup_domain", "lookup_domain")
_try_register("lookup_hash", "lookup_hash")
_try_register("correlate_iocs", "correlate_iocs")
_try_register("generate_report", "generate_report")
_try_register("enrich_entity", "enrich_entity")
_try_register("mitre_map", "mitre_map")
_try_register("quickscan", "quickscan")
_register_tasks()
# ---------------------------------------------------------------------
# Core execution helpers
# ---------------------------------------------------------------------
def call_task(tool_name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
fn = TASK_REGISTRY.get(tool_name)
if not fn:
return {
"error": f"Unknown tool '{tool_name}'. Registered tools: {sorted(TASK_REGISTRY.keys())}"
}
try:
result = fn(**payload)
if not isinstance(result, dict):
result = {"result": result}
return result
except Exception as exc:
return {
"error": f"Exception in tool '{tool_name}': {exc}",
"traceback": traceback.format_exc(),
}
def format_result_for_ui(result: Dict[str, Any]) -> Dict[str, str]:
pretty_json = json.dumps(result, indent=2, default=str)
markdown = result.get("markdown") or result.get("report") or ""
if not markdown and "summary" in result:
markdown = f"## Summary\n\n{result['summary']}"
mitre = json.dumps(result.get("mitre", ""), indent=2, default=str) if result.get("mitre") else ""
stix = json.dumps(result.get("stix", ""), indent=2, default=str) if result.get("stix") else ""
sarif = json.dumps(result.get("sarif", ""), indent=2, default=str) if result.get("sarif") else ""
return {
"summary": result.get("summary", ""),
"markdown": markdown,
"json": pretty_json,
"mitre": mitre,
"stix": stix,
"sarif": sarif,
}
# ---------------------------------------------------------------------
# MODE C β€” ANALYST COPILOT (LLM)
# ---------------------------------------------------------------------
def respond(message, history, system_message, model, hf_token, temperature, top_p, max_tokens):
"""
Streaming LLM response using HuggingFace InferenceClient.
Supports injecting OSINT task results into the conversation.
"""
client = InferenceClient(
token=hf_token.token,
model=model,
)
messages = [{"role": "system", "content": system_message}]
messages.extend(history)
messages.append({"role": "user", "content": message})
response_text = ""
for chunk in client.chat_completion(
messages=messages,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
stream=True
):
delta = chunk.choices[0].delta.content
if delta:
response_text += delta
yield response_text
def inject_osint_context(history, task_result: Dict[str, Any]):
"""
Inject JSON + summary + MITRE mappings directly into the chat history.
"""
pretty = json.dumps(task_result, indent=2, default=str)
blob = f"""
### OSINT Result Injected:
{pretty}
"""
history.append({"role": "system", "content": blob})
return history
# ---------------------------------------------------------------------
# Dashboard callbacks (Mode B)
# ---------------------------------------------------------------------
def ui_lookup_ip(ip, enrich, mitre):
raw = call_task("lookup_ip", {"ip": ip, "enrich": enrich, "map_mitre": mitre})
normal = format_result_for_ui(raw)
return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], normal["stix"], raw
def ui_lookup_domain(domain, enrich, mitre):
raw = call_task("lookup_domain", {"domain": domain, "enrich": enrich, "map_mitre": mitre})
normal = format_result_for_ui(raw)
return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], normal["stix"], raw
def ui_lookup_hash(h, ht, enrich, mitre):
raw = call_task("lookup_hash", {"hash": h, "hash_type": ht, "enrich": enrich, "map_mitre": mitre})
normal = format_result_for_ui(raw)
return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], normal["stix"], raw
def ui_correlate_iocs(iocs):
parsed = [l.strip() for l in iocs.splitlines() if l.strip()]
raw = call_task("correlate_iocs", {"iocs": parsed})
normal = format_result_for_ui(raw)
return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], raw
def ui_quickscan(target):
raw = call_task("quickscan", {"target": target})
normal = format_result_for_ui(raw)
return normal["summary"], normal["markdown"], normal["json"], raw
# ---------------------------------------------------------------------
# MCP Bridge (Mode D)
# ---------------------------------------------------------------------
def ui_mcp_bridge(tool, args_json):
try:
payload = json.loads(args_json)
except Exception as exc:
err = {"error": f"Invalid JSON: {exc}"}
return json.dumps(err, indent=2), "", err
raw = call_task(tool, payload)
normal = format_result_for_ui(raw)
return normal["json"], normal["markdown"], raw
# ---------------------------------------------------------------------
# UI β€” Now with Analyst Copilot
# ---------------------------------------------------------------------
def build_interface() -> gr.Blocks:
with gr.Blocks(title="Parrot OSINT MCP Console") as demo:
gr.Markdown("# Parrot OSINT MCP Console")
# Store OSINT task results for injection into the Copilot
osint_result_state = gr.State([])
# ------------------------------------------
# MODE B β€” Dashboard
# ------------------------------------------
with gr.Tab("OSINT Dashboard"):
with gr.Tab("IP Lookup"):
ip = gr.Textbox(label="IP Address")
enrich = gr.Checkbox(value=True, label="Enrichment")
mitre = gr.Checkbox(value=True, label="MITRE mapping")
btn = gr.Button("Run")
summary = gr.Textbox(label="Summary")
md = gr.Markdown()
js = gr.Code(language="json")
mt = gr.Code(language="json")
st = gr.Code(language="json")
btn.click(
ui_lookup_ip,
inputs=[ip, enrich, mitre],
outputs=[summary, md, js, mt, st, osint_result_state],
)
# You already know: similar tabs for domain, hash, correlation, quickscan
# (keeping focus on Copilot integration)
# ------------------------------------------
# MODE D β€” MCP Bridge
# ------------------------------------------
with gr.Tab("MCP Bridge"):
tool = gr.Dropdown(sorted(TASK_REGISTRY.keys()))
args = gr.Code(label="Args JSON")
out_js = gr.Code(language="json")
out_md = gr.Markdown()
bridge_btn = gr.Button("Call Tool")
bridge_btn.click(
ui_mcp_bridge,
inputs=[tool, args],
outputs=[out_js, out_md, osint_result_state],
)
# ------------------------------------------
# MODE C β€” Analyst Copilot
# ------------------------------------------
with gr.Tab("Analyst Copilot"):
gr.Markdown("### Streaming TI Assistant with OSINT Context Injection")
system_msg = gr.Textbox(
label="System Prompt",
value=("You are a threat intelligence analyst. "
"You think slowly, explain clearly, identify TTPs, "
"and recommend next investigative steps."),
)
model = gr.Textbox(
label="HF Model (e.g., openai/gpt-oss-20b)",
value="openai/gpt-oss-20b",
)
chatbot = gr.ChatInterface(
respond,
additional_inputs=[
system_msg,
model,
gr.OAuthToken(label="HF Token"),
gr.Slider(0.1, 2.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p"),
gr.Slider(1, 2048, value=512, step=1, label="Max Tokens"),
],
type="messages",
)
inject_btn = gr.Button("Inject Latest OSINT Result")
inject_btn.click(
inject_osint_context,
inputs=[chatbot._chatbot_state, osint_result_state],
outputs=[chatbot._chatbot_state],
)
return demo
if __name__ == "__main__":
demo = build_interface()
demo.launch()