#!/usr/bin/env python """ Parrot OSINT MCP – Gradio Frontend Modes: - OSINT Dashboard (deterministic intelligence) - MCP Bridge (raw tool access) - Analyst Copilot (LLM interpretive intelligence) """ import json import traceback from typing import Any, Dict import gradio as gr from huggingface_hub import InferenceClient # --------------------------------------------------------------------- # Task Registry (auto-loads your MCP tasks) # --------------------------------------------------------------------- TASK_REGISTRY: Dict[str, Any] = {} def _register_tasks(): def _try(name, module): try: m = __import__(f"tasks.{module}", fromlist=["*"]) fn = getattr(m, "run", None) if callable(fn): TASK_REGISTRY[name] = fn except Exception: pass _try("lookup_ip", "lookup_ip") _try("lookup_domain", "lookup_domain") _try("lookup_hash", "lookup_hash") _try("correlate_iocs", "correlate_iocs") _try("generate_report", "generate_report") _try("enrich_entity", "enrich_entity") _try("mitre_map", "mitre_map") _try("quickscan", "quickscan") _register_tasks() # --------------------------------------------------------------------- # Core Task Execution # --------------------------------------------------------------------- def call_task(name: str, payload: Dict[str, Any]): fn = TASK_REGISTRY.get(name) if not fn: return {"error": f"Unknown tool '{name}'."} try: res = fn(**payload) if not isinstance(res, dict): res = {"result": res} return res except Exception as e: return {"error": str(e), "traceback": traceback.format_exc()} def normalize_result(res: Dict[str, Any]): """Formats UI fields cleanly.""" pretty = json.dumps(res, indent=2, default=str) summary = res.get("summary", "") markdown = res.get("markdown") or res.get("report") or "" if not markdown and summary: markdown = f"## Summary\n\n{summary}" return { "summary": summary, "markdown": markdown, "json": pretty, "mitre": json.dumps(res.get("mitre", ""), indent=2, default=str) if res.get("mitre") else "", "stix": json.dumps(res.get("stix", ""), indent=2, default=str) if res.get("stix") else "", "sarif": json.dumps(res.get("sarif", ""), indent=2, default=str) if res.get("sarif") else "", } # --------------------------------------------------------------------- # ANALYST COPILOT (LLM) # --------------------------------------------------------------------- def respond( message, history, system_prompt, model_name, hf_token, temperature, top_p, max_tokens, ): """Streaming response from WhiteRabbit Neo or Cybertron.""" client = InferenceClient(model=model_name, token=hf_token.token) msgs = [{"role": "system", "content": system_prompt}] msgs.extend(history) msgs.append({"role": "user", "content": message}) buf = "" for chunk in client.chat_completion( messages=msgs, max_tokens=max_tokens, temperature=temperature, top_p=top_p, stream=True, ): delta = chunk.choices[0].delta.content if delta: buf += delta yield buf def inject_osint(history, osint_obj): """Inject raw JSON results into the chat context.""" pretty = json.dumps(osint_obj, indent=2, default=str) history.append({ "role": "system", "content": f"### Injected OSINT Result\n```\n{pretty}\n```" }) return history # --------------------------------------------------------------------- # OSINT Dashboard Callbacks # --------------------------------------------------------------------- def ui_lookup_ip(ip, enrich, mitre): raw = call_task("lookup_ip", {"ip": ip, "enrich": enrich, "map_mitre": mitre}) norm = normalize_result(raw) return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw def ui_lookup_domain(domain, enrich, mitre): raw = call_task("lookup_domain", {"domain": domain, "enrich": enrich, "map_mitre": mitre}) norm = normalize_result(raw) return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw def ui_lookup_hash(h, ht, enrich, mitre): raw = call_task("lookup_hash", {"hash": h, "hash_type": ht, "enrich": enrich, "map_mitre": mitre}) norm = normalize_result(raw) return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw def ui_correlate_iocs(iocs): lst = [x.strip() for x in iocs.splitlines() if x.strip()] raw = call_task("correlate_iocs", {"iocs": lst}) norm = normalize_result(raw) return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], raw def ui_quickscan(target): raw = call_task("quickscan", {"target": target}) norm = normalize_result(raw) return norm["summary"], norm["markdown"], norm["json"], raw # --------------------------------------------------------------------- # MCP Bridge # --------------------------------------------------------------------- def ui_bridge(tool, args_json): try: payload = json.loads(args_json) except Exception as e: return json.dumps({"error": str(e)}, indent=2), "", {} raw = call_task(tool, payload) norm = normalize_result(raw) return norm["json"], norm["markdown"], raw # --------------------------------------------------------------------- # BUILD UI # --------------------------------------------------------------------- def build_interface(): with gr.Blocks(title="Parrot OSINT MCP Console") as demo: gr.Markdown("# Parrot OSINT MCP Console") osint_state = gr.State({}) # ------------------------- # OSINT Dashboard # ------------------------- with gr.Tab("OSINT Dashboard"): with gr.Tab("IP"): ip = gr.Textbox(label="IP Address") enrich = gr.Checkbox(value=True, label="Enrich") mitre = gr.Checkbox(value=True, label="MITRE Map") run = gr.Button("Run IP Lookup") s = gr.Textbox(label="Summary") md = gr.Markdown() js = gr.Code(language="json") mt = gr.Code(language="json") st = gr.Code(language="json") run.click(ui_lookup_ip, [ip, enrich, mitre], [s, md, js, mt, st, osint_state]) # Add other tabs (Domain, Hash, etc.) # Your earlier implementation plugs in cleanly. # ------------------------- # MCP Bridge # ------------------------- with gr.Tab("MCP Bridge"): tool = gr.Dropdown(sorted(TASK_REGISTRY.keys())) args = gr.Code(language="json") btn = gr.Button("Call Tool") out_js = gr.Code(language="json") out_md = gr.Markdown() btn.click(ui_bridge, [tool, args], [out_js, out_md, osint_state]) # ------------------------- # Analyst Copilot # ------------------------- with gr.Tab("Analyst Copilot"): gr.Markdown("### WhiteRabbit Neo + Cybertron TI Assistant") system_prompt = gr.Textbox( label="System Prompt", value=( "You are a threat intelligence analyst. " "You classify TTPs, map MITRE ATT&CK, and provide investigation guidance." ), ) model_select = gr.Dropdown( label="LLM Model", choices=[ "berkeley-nest/WhiteRabbitNeo-8B", "cybertronai/cybertron-1.1-1b", "cybertronai/cybertron-1.1-7b", "cybertronai/cybertron-1.1-32b" ], value="berkeley-nest/WhiteRabbitNeo-8B", ) chatbot = gr.ChatInterface( respond, type="messages", additional_inputs=[ system_prompt, model_select, gr.OAuthToken(label="HF Token"), gr.Slider(0.1, 2.0, value=0.7, step=0.1, label="Temperature"), gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p"), gr.Slider(32, 4096, value=512, step=32, label="Max Tokens"), ], ) inject_btn = gr.Button("Inject Last OSINT Result into Chat") inject_btn.click( inject_osint, inputs=[chatbot._chatbot_state, osint_state], outputs=[chatbot._chatbot_state], ) return demo if __name__ == "__main__": demo = build_interface() demo.launch()