#!/usr/bin/env python """ Parrot OSINT MCP – Gradio Frontend Modes: - OSINT Dashboard (deterministic intelligence) - MCP Bridge (raw tool access) - Analyst Copilot (LLM interpretive intelligence) """ import json import traceback from typing import Any, Dict import gradio as gr from huggingface_hub import InferenceClient # --------------------------------------------------------------------- # Task Registry (auto-loads MCP tasks dynamically) # --------------------------------------------------------------------- TASK_REGISTRY: Dict[str, Any] = {} def _register_tasks(): """ Import tasks.* modules dynamically and pull their run() functions. Missing modules are ignored so the UI can still boot. """ def _try(name: str, module: str): try: m = __import__(f"tasks.{module}", fromlist=["*"]) fn = getattr(m, "run", None) if callable(fn): TASK_REGISTRY[name] = fn except Exception: # In Spaces, you might not have all tasks yet; that's fine. pass _try("lookup_ip", "lookup_ip") _try("lookup_domain", "lookup_domain") _try("lookup_hash", "lookup_hash") _try("correlate_iocs", "correlate_iocs") _try("generate_report", "generate_report") _try("enrich_entity", "enrich_entity") _try("mitre_map", "mitre_map") _try("quickscan", "quickscan") _register_tasks() # --------------------------------------------------------------------- # Task Execution + Normalization # --------------------------------------------------------------------- def call_task(name: str, payload: Dict[str, Any]) -> Dict[str, Any]: fn = TASK_REGISTRY.get(name) if not fn: return {"error": f"Unknown tool '{name}'."} try: res = fn(**payload) if not isinstance(res, dict): res = {"result": res} return res except Exception as e: return {"error": str(e), "traceback": traceback.format_exc()} def normalize_result(res: Dict[str, Any]) -> Dict[str, str]: """Ensures consistent UI formatting.""" pretty = json.dumps(res, indent=2, default=str) summary = res.get("summary", "") markdown = res.get("markdown") or res.get("report") or "" if not markdown and summary: markdown = f"## Summary\n\n{summary}" def safe_json(value: Any) -> str: return json.dumps(value, indent=2, default=str) if value else "" return { "summary": summary, "markdown": markdown, "json": pretty, "mitre": safe_json(res.get("mitre")), "stix": safe_json(res.get("stix")), "sarif": safe_json(res.get("sarif")), } # --------------------------------------------------------------------- # Analyst Copilot LLM # --------------------------------------------------------------------- def respond( message, history, system_prompt, model_name, hf_token, temperature, top_p, max_tokens, ): """ Streaming LLM output using WhiteRabbit Neo or Cybertron. `hf_token` is a raw string entered by the user. """ client = InferenceClient( model=model_name, token=hf_token, # Direct string token ) msgs = [{"role": "system", "content": system_prompt}] msgs.extend(history) msgs.append({"role": "user", "content": message}) buffer = "" for chunk in client.chat_completion( messages=msgs, max_tokens=max_tokens, temperature=temperature, top_p=top_p, stream=True, ): delta = chunk.choices[0].delta.content if delta: buffer += delta yield buffer def inject_osint(history, osint_obj): """ Inject OSINT result JSON into the copilot's chat history as a system message. `history` is the ChatInterface state (list of messages). """ pretty = json.dumps(osint_obj, indent=2, default=str) history.append( { "role": "system", "content": f"### Injected OSINT Result\n```\n{pretty}\n```", } ) return history # --------------------------------------------------------------------- # OSINT Dashboard Callbacks # --------------------------------------------------------------------- def ui_lookup_ip(ip, enrich, mitre): raw = call_task("lookup_ip", {"ip": ip, "enrich": enrich, "map_mitre": mitre}) norm = normalize_result(raw) return ( norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw, ) def ui_lookup_domain(domain, enrich, mitre): raw = call_task( "lookup_domain", {"domain": domain, "enrich": enrich, "map_mitre": mitre} ) norm = normalize_result(raw) return ( norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw, ) def ui_lookup_hash(h, ht, enrich, mitre): raw = call_task( "lookup_hash", {"hash": h, "hash_type": ht, "enrich": enrich, "map_mitre": mitre}, ) norm = normalize_result(raw) return ( norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw, ) def ui_correlate_iocs(iocs): lst = [x.strip() for x in iocs.splitlines() if x.strip()] raw = call_task("correlate_iocs", {"iocs": lst}) norm = normalize_result(raw) return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], raw def ui_quickscan(target): raw = call_task("quickscan", {"target": target}) norm = normalize_result(raw) return norm["summary"], norm["markdown"], norm["json"], raw # --------------------------------------------------------------------- # MCP Bridge # --------------------------------------------------------------------- def ui_bridge(tool, args_json): try: payload = json.loads(args_json) except Exception as e: return json.dumps({"error": str(e)}, indent=2), "", {} raw = call_task(tool, payload) norm = normalize_result(raw) return norm["json"], norm["markdown"], raw # --------------------------------------------------------------------- # UI Layout # --------------------------------------------------------------------- def build_interface(): with gr.Blocks(title="Parrot OSINT MCP Console") as demo: gr.Markdown( "# 🦜 Parrot OSINT MCP Console\n" "Multi-mode Intelligence Workstation." ) # Holds the last OSINT result (dict) to inject into the copilot osint_state = gr.State({}) # ------------------------- # OSINT Dashboard # ------------------------- with gr.Tab("OSINT Dashboard"): # IP Lookup with gr.Tab("IP Lookup"): ip = gr.Textbox(label="IP Address", placeholder="8.8.8.8") enrich = gr.Checkbox(value=True, label="Enrich data") mitre = gr.Checkbox(value=True, label="MITRE ATT&CK Mapping") run = gr.Button("Run IP Lookup") out_s = gr.Textbox(label="Summary") out_md = gr.Markdown() out_json = gr.Code(language="json") out_mitre = gr.Code(language="json") out_stix = gr.Code(language="json") run.click( ui_lookup_ip, [ip, enrich, mitre], [out_s, out_md, out_json, out_mitre, out_stix, osint_state], ) # You can add more tabs here: Domain Lookup, Hash Lookup, Correlate IOCs, Quickscan # ------------------------- # MCP Bridge # ------------------------- with gr.Tab("MCP Bridge"): tool = gr.Dropdown(sorted(TASK_REGISTRY.keys()), label="Tool") args = gr.Code(language="json", label="Args JSON") btn = gr.Button("Run Tool") out_bridge_json = gr.Code(language="json") out_bridge_md = gr.Markdown() btn.click( ui_bridge, [tool, args], [out_bridge_json, out_bridge_md, osint_state], ) # ------------------------- # Analyst Copilot # ------------------------- with gr.Tab("Analyst Copilot"): gr.Markdown( "### WhiteRabbit Neo + Cybertron Threat Intelligence Assistant" ) system_prompt = gr.Textbox( label="System Prompt", value=( "You are a threat intelligence analyst. " "You classify TTPs, extract indicators, map MITRE ATT&CK, " "and provide investigation guidance." ), ) model_select = gr.Dropdown( label="LLM Model", choices=[ "berkeley-nest/WhiteRabbitNeo-8B", "cybertronai/cybertron-1.1-1b", "cybertronai/cybertron-1.1-7b", "cybertronai/cybertron-1.1-32b", ], value="berkeley-nest/WhiteRabbitNeo-8B", ) gr.Markdown( "### HuggingFace API Token (required for LLM inference)" ) hf_token = gr.Textbox( label="HF Token", type="password", placeholder="hf_xxx...", ) # Chat history state for the copilot (list of messages) chat_state = gr.State([]) chatbot = gr.ChatInterface( respond, type="messages", additional_inputs=[ system_prompt, model_select, hf_token, gr.Slider( 0.1, 2.0, value=0.7, step=0.1, label="Temperature", ), gr.Slider( 0.1, 1.0, value=0.95, step=0.05, label="Top-p", ), gr.Slider( 32, 4096, value=512, step=32, label="Max Tokens", ), ], state=chat_state, ) inject_btn = gr.Button("Inject Last OSINT Result into Copilot") inject_btn.click( inject_osint, inputs=[chat_state, osint_state], outputs=[chat_state], ) return demo # --------------------------------------------------------------------- # MAIN ENTRY # --------------------------------------------------------------------- if __name__ == "__main__": demo = build_interface() demo.launch()