Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| """ | |
| Parrot OSINT MCP – Gradio Frontend | |
| Modes: | |
| - OSINT Dashboard (deterministic intelligence) | |
| - MCP Bridge (raw tool access) | |
| - Analyst Copilot (LLM interpretive intelligence) | |
| """ | |
| import json | |
| import traceback | |
| from typing import Any, Dict | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| # --------------------------------------------------------------------- | |
| # Task Registry (auto-loads MCP tasks dynamically) | |
| # --------------------------------------------------------------------- | |
| TASK_REGISTRY: Dict[str, Any] = {} | |
| def _register_tasks(): | |
| """ | |
| Import tasks.* modules dynamically and pull their run() functions. | |
| Missing modules are ignored so the UI can still boot. | |
| """ | |
| def _try(name: str, module: str): | |
| try: | |
| m = __import__(f"tasks.{module}", fromlist=["*"]) | |
| fn = getattr(m, "run", None) | |
| if callable(fn): | |
| TASK_REGISTRY[name] = fn | |
| except Exception: | |
| # In Spaces, you might not have all tasks yet; that's fine. | |
| pass | |
| _try("lookup_ip", "lookup_ip") | |
| _try("lookup_domain", "lookup_domain") | |
| _try("lookup_hash", "lookup_hash") | |
| _try("correlate_iocs", "correlate_iocs") | |
| _try("generate_report", "generate_report") | |
| _try("enrich_entity", "enrich_entity") | |
| _try("mitre_map", "mitre_map") | |
| _try("quickscan", "quickscan") | |
| _register_tasks() | |
| # --------------------------------------------------------------------- | |
| # Task Execution + Normalization | |
| # --------------------------------------------------------------------- | |
| def call_task(name: str, payload: Dict[str, Any]) -> Dict[str, Any]: | |
| fn = TASK_REGISTRY.get(name) | |
| if not fn: | |
| return {"error": f"Unknown tool '{name}'."} | |
| try: | |
| res = fn(**payload) | |
| if not isinstance(res, dict): | |
| res = {"result": res} | |
| return res | |
| except Exception as e: | |
| return {"error": str(e), "traceback": traceback.format_exc()} | |
| def normalize_result(res: Dict[str, Any]) -> Dict[str, str]: | |
| """Ensures consistent UI formatting.""" | |
| pretty = json.dumps(res, indent=2, default=str) | |
| summary = res.get("summary", "") | |
| markdown = res.get("markdown") or res.get("report") or "" | |
| if not markdown and summary: | |
| markdown = f"## Summary\n\n{summary}" | |
| def safe_json(value: Any) -> str: | |
| return json.dumps(value, indent=2, default=str) if value else "" | |
| return { | |
| "summary": summary, | |
| "markdown": markdown, | |
| "json": pretty, | |
| "mitre": safe_json(res.get("mitre")), | |
| "stix": safe_json(res.get("stix")), | |
| "sarif": safe_json(res.get("sarif")), | |
| } | |
| # --------------------------------------------------------------------- | |
| # Analyst Copilot LLM | |
| # --------------------------------------------------------------------- | |
| def respond( | |
| message, | |
| history, | |
| system_prompt, | |
| model_name, | |
| hf_token, | |
| temperature, | |
| top_p, | |
| max_tokens, | |
| ): | |
| """ | |
| Streaming LLM output using WhiteRabbit Neo or Cybertron. | |
| `hf_token` is a raw string entered by the user. | |
| """ | |
| client = InferenceClient( | |
| model=model_name, | |
| token=hf_token, # Direct string token | |
| ) | |
| msgs = [{"role": "system", "content": system_prompt}] | |
| msgs.extend(history) | |
| msgs.append({"role": "user", "content": message}) | |
| buffer = "" | |
| for chunk in client.chat_completion( | |
| messages=msgs, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| stream=True, | |
| ): | |
| delta = chunk.choices[0].delta.content | |
| if delta: | |
| buffer += delta | |
| yield buffer | |
| def inject_osint(history, osint_obj): | |
| """ | |
| Inject OSINT result JSON into the copilot's chat history as a system message. | |
| `history` is the ChatInterface state (list of messages). | |
| """ | |
| pretty = json.dumps(osint_obj, indent=2, default=str) | |
| history.append( | |
| { | |
| "role": "system", | |
| "content": f"### Injected OSINT Result\n```\n{pretty}\n```", | |
| } | |
| ) | |
| return history | |
| # --------------------------------------------------------------------- | |
| # OSINT Dashboard Callbacks | |
| # --------------------------------------------------------------------- | |
| def ui_lookup_ip(ip, enrich, mitre): | |
| raw = call_task("lookup_ip", {"ip": ip, "enrich": enrich, "map_mitre": mitre}) | |
| norm = normalize_result(raw) | |
| return ( | |
| norm["summary"], | |
| norm["markdown"], | |
| norm["json"], | |
| norm["mitre"], | |
| norm["stix"], | |
| raw, | |
| ) | |
| def ui_lookup_domain(domain, enrich, mitre): | |
| raw = call_task( | |
| "lookup_domain", {"domain": domain, "enrich": enrich, "map_mitre": mitre} | |
| ) | |
| norm = normalize_result(raw) | |
| return ( | |
| norm["summary"], | |
| norm["markdown"], | |
| norm["json"], | |
| norm["mitre"], | |
| norm["stix"], | |
| raw, | |
| ) | |
| def ui_lookup_hash(h, ht, enrich, mitre): | |
| raw = call_task( | |
| "lookup_hash", | |
| {"hash": h, "hash_type": ht, "enrich": enrich, "map_mitre": mitre}, | |
| ) | |
| norm = normalize_result(raw) | |
| return ( | |
| norm["summary"], | |
| norm["markdown"], | |
| norm["json"], | |
| norm["mitre"], | |
| norm["stix"], | |
| raw, | |
| ) | |
| def ui_correlate_iocs(iocs): | |
| lst = [x.strip() for x in iocs.splitlines() if x.strip()] | |
| raw = call_task("correlate_iocs", {"iocs": lst}) | |
| norm = normalize_result(raw) | |
| return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], raw | |
| def ui_quickscan(target): | |
| raw = call_task("quickscan", {"target": target}) | |
| norm = normalize_result(raw) | |
| return norm["summary"], norm["markdown"], norm["json"], raw | |
| # --------------------------------------------------------------------- | |
| # MCP Bridge | |
| # --------------------------------------------------------------------- | |
| def ui_bridge(tool, args_json): | |
| try: | |
| payload = json.loads(args_json) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}, indent=2), "", {} | |
| raw = call_task(tool, payload) | |
| norm = normalize_result(raw) | |
| return norm["json"], norm["markdown"], raw | |
| # --------------------------------------------------------------------- | |
| # UI Layout | |
| # --------------------------------------------------------------------- | |
| def build_interface(): | |
| with gr.Blocks(title="Parrot OSINT MCP Console") as demo: | |
| gr.Markdown( | |
| "# 🦜 Parrot OSINT MCP Console\n" | |
| "Multi-mode Intelligence Workstation." | |
| ) | |
| # Holds the last OSINT result (dict) to inject into the copilot | |
| osint_state = gr.State({}) | |
| # ------------------------- | |
| # OSINT Dashboard | |
| # ------------------------- | |
| with gr.Tab("OSINT Dashboard"): | |
| # IP Lookup | |
| with gr.Tab("IP Lookup"): | |
| ip = gr.Textbox(label="IP Address", placeholder="8.8.8.8") | |
| enrich = gr.Checkbox(value=True, label="Enrich data") | |
| mitre = gr.Checkbox(value=True, label="MITRE ATT&CK Mapping") | |
| run = gr.Button("Run IP Lookup") | |
| out_s = gr.Textbox(label="Summary") | |
| out_md = gr.Markdown() | |
| out_json = gr.Code(language="json") | |
| out_mitre = gr.Code(language="json") | |
| out_stix = gr.Code(language="json") | |
| run.click( | |
| ui_lookup_ip, | |
| [ip, enrich, mitre], | |
| [out_s, out_md, out_json, out_mitre, out_stix, osint_state], | |
| ) | |
| # You can add more tabs here: Domain Lookup, Hash Lookup, Correlate IOCs, Quickscan | |
| # ------------------------- | |
| # MCP Bridge | |
| # ------------------------- | |
| with gr.Tab("MCP Bridge"): | |
| tool = gr.Dropdown(sorted(TASK_REGISTRY.keys()), label="Tool") | |
| args = gr.Code(language="json", label="Args JSON") | |
| btn = gr.Button("Run Tool") | |
| out_bridge_json = gr.Code(language="json") | |
| out_bridge_md = gr.Markdown() | |
| btn.click( | |
| ui_bridge, | |
| [tool, args], | |
| [out_bridge_json, out_bridge_md, osint_state], | |
| ) | |
| # ------------------------- | |
| # Analyst Copilot | |
| # ------------------------- | |
| with gr.Tab("Analyst Copilot"): | |
| gr.Markdown( | |
| "### WhiteRabbit Neo + Cybertron Threat Intelligence Assistant" | |
| ) | |
| system_prompt = gr.Textbox( | |
| label="System Prompt", | |
| value=( | |
| "You are a threat intelligence analyst. " | |
| "You classify TTPs, extract indicators, map MITRE ATT&CK, " | |
| "and provide investigation guidance." | |
| ), | |
| ) | |
| model_select = gr.Dropdown( | |
| label="LLM Model", | |
| choices=[ | |
| "berkeley-nest/WhiteRabbitNeo-8B", | |
| "cybertronai/cybertron-1.1-1b", | |
| "cybertronai/cybertron-1.1-7b", | |
| "cybertronai/cybertron-1.1-32b", | |
| ], | |
| value="berkeley-nest/WhiteRabbitNeo-8B", | |
| ) | |
| gr.Markdown( | |
| "### HuggingFace API Token (required for LLM inference)" | |
| ) | |
| hf_token = gr.Textbox( | |
| label="HF Token", | |
| type="password", | |
| placeholder="hf_xxx...", | |
| ) | |
| # Chat history state for the copilot (list of messages) | |
| chat_state = gr.State([]) | |
| chatbot = gr.ChatInterface( | |
| respond, | |
| type="messages", | |
| additional_inputs=[ | |
| system_prompt, | |
| model_select, | |
| hf_token, | |
| gr.Slider( | |
| 0.1, | |
| 2.0, | |
| value=0.7, | |
| step=0.1, | |
| label="Temperature", | |
| ), | |
| gr.Slider( | |
| 0.1, | |
| 1.0, | |
| value=0.95, | |
| step=0.05, | |
| label="Top-p", | |
| ), | |
| gr.Slider( | |
| 32, | |
| 4096, | |
| value=512, | |
| step=32, | |
| label="Max Tokens", | |
| ), | |
| ], | |
| state=chat_state, | |
| ) | |
| inject_btn = gr.Button("Inject Last OSINT Result into Copilot") | |
| inject_btn.click( | |
| inject_osint, | |
| inputs=[chat_state, osint_state], | |
| outputs=[chat_state], | |
| ) | |
| return demo | |
| # --------------------------------------------------------------------- | |
| # MAIN ENTRY | |
| # --------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| demo = build_interface() | |
| demo.launch() |