OSINTMCPServer / app.py
S-Dreamer's picture
Update app.py
a1cde8b verified
raw
history blame
4.5 kB
#!/usr/bin/env python
"""
Parrot OSINT MCP – Gradio Frontend
Modes:
- OSINT Dashboard (deterministic intelligence)
- MCP Bridge (raw tool access)
- Analyst Copilot (LLM interpretive intelligence)
"""
import json
import traceback
from typing import Any, Dict
import gradio as gr
from huggingface_hub import InferenceClient
# ---------------------------------------------------------------------
# Task Registry (auto-loads your MCP tasks)
# ---------------------------------------------------------------------
TASK_REGISTRY: Dict[str, Any] = {}
def _register_tasks():
def _try(name, module):
try:
m = __import__(f"tasks.{module}", fromlist=["*"])
fn = getattr(m, "run", None)
if callable(fn):
TASK_REGISTRY[name] = fn
except Exception:
pass
_try("lookup_ip", "lookup_ip")
_try("lookup_domain", "lookup_domain")
_try("lookup_hash", "lookup_hash")
_try("correlate_iocs", "correlate_iocs")
_try("generate_report", "generate_report")
_try("enrich_entity", "enrich_entity")
_try("mitre_map", "mitre_map")
_try("quickscan", "quickscan")
_register_tasks()
# ---------------------------------------------------------------------
# Core Task Execution
# ---------------------------------------------------------------------
def call_task(name: str, payload: Dict[str, Any]):
fn = TASK_REGISTRY.get(name)
if not fn:
return {"error": f"Unknown tool '{name}'."}
try:
res = fn(**payload)
if not isinstance(res, dict):
res = {"result": res}
return res
except Exception as e:
return {"error": str(e), "traceback": traceback.format_exc()}
def normalize_result(res: Dict[str, Any]):
"""Formats UI fields cleanly."""
pretty = json.dumps(res, indent=2, default=str)
summary = res.get("summary", "")
markdown = res.get("markdown") or res.get("report") or ""
if not markdown and summary:
markdown = f"## Summary\n\n{summary}"
return {
"summary": summary,
"markdown": markdown,
"json": pretty,
"mitre": json.dumps(res.get("mitre", ""), indent=2, default=str) if res.get("mitre") else "",
"stix": json.dumps(res.get("stix", ""), indent=2, default=str) if res.get("stix") else "",
"sarif": json.dumps(res.get("sarif", ""), indent=2, default=str) if res.get("sarif") else "",
}
# ---------------------------------------------------------------------
# ANALYST COPILOT (LLM)
# ---------------------------------------------------------------------
# -------------------------
# Analyst Copilot
# -------------------------
with gr.Tab("Analyst Copilot"):
gr.Markdown("### WhiteRabbit Neo + Cybertron TI Assistant")
system_prompt = gr.Textbox(
label="System Prompt",
value=(
"You are a threat intelligence analyst. "
"You classify TTPs, map MITRE ATT&CK, and provide investigation guidance."
),
)
model_select = gr.Dropdown(
label="LLM Model",
choices=[
"berkeley-nest/WhiteRabbitNeo-8B",
"cybertronai/cybertron-1.1-1b",
"cybertronai/cybertron-1.1-7b",
"cybertronai/cybertron-1.1-32b"
],
value="berkeley-nest/WhiteRabbitNeo-8B",
)
gr.Markdown("### Provide your HuggingFace API Token for the LLM:")
hf_token = gr.OAuthToken()
chatbot = gr.ChatInterface(
respond,
type="messages",
additional_inputs=[
system_prompt,
model_select,
hf_token,
gr.Slider(0.1, 2.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p"),
gr.Slider(32, 4096, value=512, step=32, label="Max Tokens"),
],
)
inject_btn = gr.Button("Inject Last OSINT Result into Chat")
inject_btn.click(
inject_osint,
inputs=[chatbot._chatbot_state, osint_state],
outputs=[chatbot._chatbot_state],
)
return demo
if __name__ == "__main__":
demo = build_interface()
demo.launch()