#!/usr/bin/env python """ Parrot OSINT MCP – Gradio Frontend Modes: - "OSINT Dashboard" (multi-tool, opinionated) - "MCP Bridge" (raw tool_name + JSON args → JSON result) - "Analyst Copilot" (streaming LLM with OSINT context injection) """ import json import traceback from typing import Any, Dict import gradio as gr from huggingface_hub import InferenceClient # --------------------------------------------------------------------- # Task registry: adapt this to your actual task API # --------------------------------------------------------------------- TASK_REGISTRY: Dict[str, Any] = {} def _register_tasks() -> None: def _try_register(name: str, module_name: str): try: module = __import__(f"tasks.{module_name}", fromlist=["*"]) fn = getattr(module, "run", None) if callable(fn): TASK_REGISTRY[name] = fn except Exception: pass _try_register("lookup_ip", "lookup_ip") _try_register("lookup_domain", "lookup_domain") _try_register("lookup_hash", "lookup_hash") _try_register("correlate_iocs", "correlate_iocs") _try_register("generate_report", "generate_report") _try_register("enrich_entity", "enrich_entity") _try_register("mitre_map", "mitre_map") _try_register("quickscan", "quickscan") _register_tasks() # --------------------------------------------------------------------- # Core execution helpers # --------------------------------------------------------------------- def call_task(tool_name: str, payload: Dict[str, Any]) -> Dict[str, Any]: fn = TASK_REGISTRY.get(tool_name) if not fn: return { "error": f"Unknown tool '{tool_name}'. Registered tools: {sorted(TASK_REGISTRY.keys())}" } try: result = fn(**payload) if not isinstance(result, dict): result = {"result": result} return result except Exception as exc: return { "error": f"Exception in tool '{tool_name}': {exc}", "traceback": traceback.format_exc(), } def format_result_for_ui(result: Dict[str, Any]) -> Dict[str, str]: pretty_json = json.dumps(result, indent=2, default=str) markdown = result.get("markdown") or result.get("report") or "" if not markdown and "summary" in result: markdown = f"## Summary\n\n{result['summary']}" mitre = json.dumps(result.get("mitre", ""), indent=2, default=str) if result.get("mitre") else "" stix = json.dumps(result.get("stix", ""), indent=2, default=str) if result.get("stix") else "" sarif = json.dumps(result.get("sarif", ""), indent=2, default=str) if result.get("sarif") else "" return { "summary": result.get("summary", ""), "markdown": markdown, "json": pretty_json, "mitre": mitre, "stix": stix, "sarif": sarif, } # --------------------------------------------------------------------- # MODE C — ANALYST COPILOT (LLM) # --------------------------------------------------------------------- def respond(message, history, system_message, model, hf_token, temperature, top_p, max_tokens): """ Streaming LLM response using HuggingFace InferenceClient. Supports injecting OSINT task results into the conversation. """ client = InferenceClient( token=hf_token.token, model=model, ) messages = [{"role": "system", "content": system_message}] messages.extend(history) messages.append({"role": "user", "content": message}) response_text = "" for chunk in client.chat_completion( messages=messages, temperature=temperature, top_p=top_p, max_tokens=max_tokens, stream=True ): delta = chunk.choices[0].delta.content if delta: response_text += delta yield response_text def inject_osint_context(history, task_result: Dict[str, Any]): """ Inject JSON + summary + MITRE mappings directly into the chat history. """ pretty = json.dumps(task_result, indent=2, default=str) blob = f""" ### OSINT Result Injected: {pretty} """ history.append({"role": "system", "content": blob}) return history # --------------------------------------------------------------------- # Dashboard callbacks (Mode B) # --------------------------------------------------------------------- def ui_lookup_ip(ip, enrich, mitre): raw = call_task("lookup_ip", {"ip": ip, "enrich": enrich, "map_mitre": mitre}) normal = format_result_for_ui(raw) return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], normal["stix"], raw def ui_lookup_domain(domain, enrich, mitre): raw = call_task("lookup_domain", {"domain": domain, "enrich": enrich, "map_mitre": mitre}) normal = format_result_for_ui(raw) return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], normal["stix"], raw def ui_lookup_hash(h, ht, enrich, mitre): raw = call_task("lookup_hash", {"hash": h, "hash_type": ht, "enrich": enrich, "map_mitre": mitre}) normal = format_result_for_ui(raw) return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], normal["stix"], raw def ui_correlate_iocs(iocs): parsed = [l.strip() for l in iocs.splitlines() if l.strip()] raw = call_task("correlate_iocs", {"iocs": parsed}) normal = format_result_for_ui(raw) return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], raw def ui_quickscan(target): raw = call_task("quickscan", {"target": target}) normal = format_result_for_ui(raw) return normal["summary"], normal["markdown"], normal["json"], raw # --------------------------------------------------------------------- # MCP Bridge (Mode D) # --------------------------------------------------------------------- def ui_mcp_bridge(tool, args_json): try: payload = json.loads(args_json) except Exception as exc: err = {"error": f"Invalid JSON: {exc}"} return json.dumps(err, indent=2), "", err raw = call_task(tool, payload) normal = format_result_for_ui(raw) return normal["json"], normal["markdown"], raw # --------------------------------------------------------------------- # UI — Now with Analyst Copilot # --------------------------------------------------------------------- def build_interface() -> gr.Blocks: with gr.Blocks(title="Parrot OSINT MCP Console") as demo: gr.Markdown("# Parrot OSINT MCP Console") # Store OSINT task results for injection into the Copilot osint_result_state = gr.State([]) # ------------------------------------------ # MODE B — Dashboard # ------------------------------------------ with gr.Tab("OSINT Dashboard"): with gr.Tab("IP Lookup"): ip = gr.Textbox(label="IP Address") enrich = gr.Checkbox(value=True, label="Enrichment") mitre = gr.Checkbox(value=True, label="MITRE mapping") btn = gr.Button("Run") summary = gr.Textbox(label="Summary") md = gr.Markdown() js = gr.Code(language="json") mt = gr.Code(language="json") st = gr.Code(language="json") btn.click( ui_lookup_ip, inputs=[ip, enrich, mitre], outputs=[summary, md, js, mt, st, osint_result_state], ) # You already know: similar tabs for domain, hash, correlation, quickscan # (keeping focus on Copilot integration) # ------------------------------------------ # MODE D — MCP Bridge # ------------------------------------------ with gr.Tab("MCP Bridge"): tool = gr.Dropdown(sorted(TASK_REGISTRY.keys())) args = gr.Code(label="Args JSON") out_js = gr.Code(language="json") out_md = gr.Markdown() bridge_btn = gr.Button("Call Tool") bridge_btn.click( ui_mcp_bridge, inputs=[tool, args], outputs=[out_js, out_md, osint_result_state], ) # ------------------------------------------ # MODE C — Analyst Copilot # ------------------------------------------ with gr.Tab("Analyst Copilot"): gr.Markdown("### Streaming TI Assistant with OSINT Context Injection") system_msg = gr.Textbox( label="System Prompt", value=("You are a threat intelligence analyst. " "You think slowly, explain clearly, identify TTPs, " "and recommend next investigative steps."), ) model = gr.Textbox( label="HF Model (e.g., openai/gpt-oss-20b)", value="openai/gpt-oss-20b", ) chatbot = gr.ChatInterface( respond, additional_inputs=[ system_msg, model, gr.OAuthToken(label="HF Token"), gr.Slider(0.1, 2.0, value=0.7, step=0.1, label="Temperature"), gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p"), gr.Slider(1, 2048, value=512, step=1, label="Max Tokens"), ], type="messages", ) inject_btn = gr.Button("Inject Latest OSINT Result") inject_btn.click( inject_osint_context, inputs=[chatbot._chatbot_state, osint_result_state], outputs=[chatbot._chatbot_state], ) return demo if __name__ == "__main__": demo = build_interface() demo.launch()